Add mechanism to check whether the test can be run or not according to the scenario...
[functest.git] / testcases / functest_utils.py
1 #!/usr/bin/env python
2 #
3 # jose.lausuch@ericsson.com
4 # valentin.boucher@orange.com
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10
11
12 import os
13 import os.path
14 import urllib2
15 import subprocess
16 import sys
17 import requests
18 import json
19 import shutil
20 from git import Repo
21
22
23 # ############ CREDENTIALS OPENSTACK #############
24 def check_credentials():
25     """
26     Check if the OpenStack credentials (openrc) are sourced
27     """
28     env_vars = ['OS_AUTH_URL', 'OS_USERNAME', 'OS_PASSWORD', 'OS_TENANT_NAME']
29     return all(map(lambda v: v in os.environ and os.environ[v], env_vars))
30
31
32 def get_credentials(service):
33     """Returns a creds dictionary filled with the following keys:
34     * username
35     * password/api_key (depending on the service)
36     * tenant_name/project_id (depending on the service)
37     * auth_url
38     :param service: a string indicating the name of the service
39                     requesting the credentials.
40     """
41     creds = {}
42     # Unfortunately, each of the OpenStack client will request slightly
43     # different entries in their credentials dict.
44     if service.lower() in ("nova", "cinder"):
45         password = "api_key"
46         tenant = "project_id"
47     else:
48         password = "password"
49         tenant = "tenant_name"
50
51     # The most common way to pass these info to the script is to do it through
52     # environment variables.
53     creds.update({
54         "username": os.environ.get('OS_USERNAME', "admin"),
55         password: os.environ.get("OS_PASSWORD", 'admin'),
56         "auth_url": os.environ.get("OS_AUTH_URL",
57                                    "http://192.168.20.71:5000/v2.0"),
58         tenant: os.environ.get("OS_TENANT_NAME", "admin"),
59     })
60
61     return creds
62
63
64 # ################ NOVA #################
65 def get_instances(nova_client):
66     try:
67         instances = nova_client.servers.list(search_opts={'all_tenants': 1})
68         return instances
69     except:
70         return None
71
72
73 def get_instance_status(nova_client, instance):
74     try:
75         instance = nova_client.servers.get(instance.id)
76         return instance.status
77     except:
78         return None
79
80
81 def get_instance_by_name(nova_client, instance_name):
82     try:
83         instance = nova_client.servers.find(name=instance_name)
84         return instance
85     except:
86         return None
87
88
89 def get_flavor_id(nova_client, flavor_name):
90     flavors = nova_client.flavors.list(detailed=True)
91     id = ''
92     for f in flavors:
93         if f.name == flavor_name:
94             id = f.id
95             break
96     return id
97
98
99 def get_flavor_id_by_ram_range(nova_client, min_ram, max_ram):
100     flavors = nova_client.flavors.list(detailed=True)
101     id = ''
102     for f in flavors:
103         if min_ram <= f.ram and f.ram <= max_ram:
104             id = f.id
105             break
106     return id
107
108
109 def delete_instance(nova_client, instance_id):
110     try:
111         nova_client.servers.force_delete(instance_id)
112         return True
113     except:
114         print "Error:", sys.exc_info()[0]
115         return False
116
117
118 def get_floating_ips(nova_client):
119     try:
120         floating_ips = nova_client.floating_ips.list()
121         return floating_ips
122     except:
123         return None
124
125
126 def delete_floating_ip(nova_client, floatingip_id):
127     try:
128         nova_client.floating_ips.delete(floatingip_id)
129         return True
130     except:
131         print "Error:", sys.exc_info()[0]
132         return None
133
134
135 # ################ NEUTRON #################
136 def create_neutron_net(neutron_client, name):
137     json_body = {'network': {'name': name,
138                              'admin_state_up': True}}
139     try:
140         network = neutron_client.create_network(body=json_body)
141         network_dict = network['network']
142         return network_dict['id']
143     except:
144         print "Error:", sys.exc_info()[0]
145         return False
146
147
148 def update_neutron_net(neutron_client, network_id, shared=False):
149     json_body = {'network': {'shared': shared}}
150     try:
151         neutron_client.update_network(network_id, body=json_body)
152         return True
153     except:
154         print "Error:", sys.exc_info()[0]
155         return False
156
157
158 def delete_neutron_net(neutron_client, network_id):
159     try:
160         neutron_client.delete_network(network_id)
161         return True
162     except:
163         print "Error:", sys.exc_info()[0]
164         return False
165
166
167 def create_neutron_subnet(neutron_client, name, cidr, net_id):
168     json_body = {'subnets': [{'name': name, 'cidr': cidr,
169                              'ip_version': 4, 'network_id': net_id}]}
170     try:
171         subnet = neutron_client.create_subnet(body=json_body)
172         return subnet['subnets'][0]['id']
173     except:
174         print "Error:", sys.exc_info()[0]
175         return False
176
177
178 def delete_neutron_subnet(neutron_client, subnet_id):
179     try:
180         neutron_client.delete_subnet(subnet_id)
181         return True
182     except:
183         print "Error:", sys.exc_info()[0]
184         return False
185
186
187 def create_neutron_router(neutron_client, name):
188     json_body = {'router': {'name': name, 'admin_state_up': True}}
189     try:
190         router = neutron_client.create_router(json_body)
191         return router['router']['id']
192     except:
193         print "Error:", sys.exc_info()[0]
194         return False
195
196
197 def delete_neutron_router(neutron_client, router_id):
198     json_body = {'router': {'id': router_id}}
199     try:
200         neutron_client.delete_router(router=router_id)
201         return True
202     except:
203         print "Error:", sys.exc_info()[0]
204         return False
205
206
207 def add_interface_router(neutron_client, router_id, subnet_id):
208     json_body = {"subnet_id": subnet_id}
209     try:
210         neutron_client.add_interface_router(router=router_id, body=json_body)
211         return True
212     except:
213         print "Error:", sys.exc_info()[0]
214         return False
215
216
217 def remove_interface_router(neutron_client, router_id, subnet_id):
218     json_body = {"subnet_id": subnet_id}
219     try:
220         neutron_client.remove_interface_router(router=router_id,
221                                                body=json_body)
222         return True
223     except:
224         print "Error:", sys.exc_info()[0]
225         return False
226
227
228 def remove_gateway_router(neutron_client, router_id):
229     try:
230         neutron_client.remove_gateway_router(router_id)
231         return True
232     except:
233         print "Error:", sys.exc_info()[0]
234         return False
235
236
237 def create_neutron_port(neutron_client, name, network_id, ip):
238     json_body = {'port': {
239                  'admin_state_up': True,
240                  'name': name,
241                  'network_id': network_id,
242                  'fixed_ips': [{"ip_address": ip}]
243                  }}
244     try:
245         port = neutron_client.create_port(body=json_body)
246         return port['port']['id']
247     except:
248         print "Error:", sys.exc_info()[0]
249         return False
250
251
252 def update_neutron_port(neutron_client, port_id, device_owner):
253     json_body = {'port': {
254                  'device_owner': device_owner,
255                  }}
256     try:
257         port = neutron_client.update_port(port=port_id,
258                                           body=json_body)
259         return port['port']['id']
260     except:
261         print "Error:", sys.exc_info()[0]
262         return False
263
264
265 def delete_neutron_port(neutron_client, port_id):
266     try:
267         neutron_client.delete_port(port_id)
268         return True
269     except:
270         print "Error:", sys.exc_info()[0]
271         return False
272
273
274 def get_network_id(neutron_client, network_name):
275     networks = neutron_client.list_networks()['networks']
276     id = ''
277     for n in networks:
278         if n['name'] == network_name:
279             id = n['id']
280             break
281     return id
282
283
284 def check_neutron_net(neutron_client, net_name):
285     for network in neutron_client.list_networks()['networks']:
286         if network['name'] == net_name:
287             for subnet in network['subnets']:
288                 return True
289     return False
290
291
292 def get_network_list(neutron_client):
293     network_list = neutron_client.list_networks()['networks']
294     if len(network_list) == 0:
295         return None
296     else:
297         return network_list
298
299
300 def get_router_list(neutron_client):
301     router_list = neutron_client.list_routers()['routers']
302     if len(router_list) == 0:
303         return None
304     else:
305         return router_list
306
307
308 def get_port_list(neutron_client):
309     port_list = neutron_client.list_ports()['ports']
310     if len(port_list) == 0:
311         return None
312     else:
313         return port_list
314
315
316 def get_external_net(neutron_client):
317     for network in neutron_client.list_networks()['networks']:
318         if network['router:external']:
319             return network['name']
320     return False
321
322
323 def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
324     json_body = {"quota": {
325         "security_group": sg_quota,
326         "security_group_rule": sg_rule_quota
327     }}
328
329     try:
330         quota = neutron_client.update_quota(tenant_id=tenant_id,
331                                             body=json_body)
332         return True
333     except:
334         print "Error:", sys.exc_info()[0]
335         return False
336
337
338 def update_cinder_quota(cinder_client, tenant_id, vols_quota,
339                         snapshots_quota, gigabytes_quota):
340     quotas_values = {"volumes": vols_quota,
341                      "snapshots": snapshots_quota,
342                      "gigabytes": gigabytes_quota}
343
344     try:
345         quotas_default = cinder_client. quotas.update(tenant_id,
346                                                       **quotas_values)
347         return True
348     except:
349         print "Error:", sys.exc_info()[0]
350         return False
351
352
353 def get_private_net(neutron_client):
354     # Checks if there is an existing shared private network
355     networks = neutron_client.list_networks()['networks']
356     if len(networks) == 0:
357         return None
358     for net in networks:
359         if (net['router:external'] is False) and (net['shared'] is True):
360             return net
361     return None
362
363
364 # ################ GLANCE #################
365 def get_images(nova_client):
366     try:
367         images = nova_client.images.list()
368         return images
369     except:
370         return None
371
372
373 def get_image_id(glance_client, image_name):
374     images = glance_client.images.list()
375     id = ''
376     for i in images:
377         if i.name == image_name:
378             id = i.id
379             break
380     return id
381
382
383 def create_glance_image(glance_client, image_name, file_path, public=True):
384     if not os.path.isfile(file_path):
385         print "Error: file "+file_path+" does not exist."
386         return False
387     try:
388         with open(file_path) as fimage:
389             image = glance_client.images.create(name=image_name,
390                                                 is_public=public,
391                                                 disk_format="qcow2",
392                                                 container_format="bare",
393                                                 data=fimage)
394         return image.id
395     except:
396         print "Error:", sys.exc_info()[0]
397         return False
398
399
400 def delete_glance_image(nova_client, image_id):
401     try:
402         nova_client.images.delete(image_id)
403         return True
404     except:
405         print "Error:", sys.exc_info()[0]
406         return False
407
408
409 # ################ CINDER #################
410 def get_volumes(cinder_client):
411     try:
412         volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
413         return volumes
414     except:
415         return None
416
417
418 def delete_volume(cinder_client, volume_id):
419     try:
420         cinder_client.volumes.delete(volume_id)
421         return True
422     except:
423         print "Error:", sys.exc_info()[0]
424         return False
425
426
427 # ################ CINDER #################
428 def get_security_groups(neutron_client):
429     try:
430         security_groups = neutron_client.list_security_groups()['security_groups']
431         return security_groups
432     except:
433         return None
434
435
436 def delete_security_group(neutron_client, secgroup_id):
437     try:
438         neutron_client.delete_security_group(secgroup_id)
439         return True
440     except:
441         print "Error:", sys.exc_info()[0]
442         return False
443
444
445 # ################ KEYSTONE #################
446 def get_tenants(keystone_client):
447     try:
448         tenants = keystone_client.tenants.list()
449         return tenants
450     except:
451         return None
452
453
454 def get_tenant_id(keystone_client, tenant_name):
455     tenants = keystone_client.tenants.list()
456     id = ''
457     for t in tenants:
458         if t.name == tenant_name:
459             id = t.id
460             break
461     return id
462
463
464 def get_users(keystone_client):
465     try:
466         users = keystone_client.users.list()
467         return users
468     except:
469         return None
470
471
472 def get_role_id(keystone_client, role_name):
473     roles = keystone_client.roles.list()
474     id = ''
475     for r in roles:
476         if r.name == role_name:
477             id = r.id
478             break
479     return id
480
481
482 def get_user_id(keystone_client, user_name):
483     users = keystone_client.users.list()
484     id = ''
485     for u in users:
486         if u.name == user_name:
487             id = u.id
488             break
489     return id
490
491
492 def create_tenant(keystone_client, tenant_name, tenant_description):
493     try:
494         tenant = keystone_client.tenants.create(tenant_name,
495                                                 tenant_description,
496                                                 enabled=True)
497         return tenant.id
498     except:
499         print "Error:", sys.exc_info()[0]
500         return False
501
502
503 def delete_tenant(keystone_client, tenant_id):
504     try:
505         tenant = keystone_client.tenants.delete(tenant_id)
506         return True
507     except:
508         print "Error:", sys.exc_info()[0]
509         return False
510
511
512 def create_user(keystone_client, user_name, user_password,
513                 user_email, tenant_id):
514     try:
515         user = keystone_client.users.create(user_name, user_password,
516                                             user_email, tenant_id,
517                                             enabled=True)
518         return user.id
519     except:
520         print "Error:", sys.exc_info()[0]
521         return False
522
523
524 def delete_user(keystone_client, user_id):
525     try:
526         tenant = keystone_client.users.delete(user_id)
527         return True
528     except:
529         print "Error:", sys.exc_info()[0]
530         return False
531
532
533 def add_role_user(keystone_client, user_id, role_id, tenant_id):
534     try:
535         keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
536         return True
537     except:
538         print "Error:", sys.exc_info()[0]
539         return False
540
541
542 # ################ UTILS #################
543 def check_internet_connectivity(url='http://www.opnfv.org/'):
544     """
545     Check if there is access to the internet
546     """
547     try:
548         urllib2.urlopen(url, timeout=5)
549         return True
550     except urllib2.URLError:
551         return False
552
553
554 def download_url(url, dest_path):
555     """
556     Download a file to a destination path given a URL
557     """
558     name = url.rsplit('/')[-1]
559     dest = dest_path + "/" + name
560     try:
561         response = urllib2.urlopen(url)
562     except (urllib2.HTTPError, urllib2.URLError):
563         return False
564
565     with open(dest, 'wb') as f:
566         shutil.copyfileobj(response, f)
567     return True
568
569
570 def execute_command(cmd, logger=None):
571     """
572     Execute Linux command
573     """
574     if logger:
575         logger.debug('Executing command : {}'.format(cmd))
576     output_file = "output.txt"
577     f = open(output_file, 'w+')
578     p = subprocess.call(cmd, shell=True, stdout=f, stderr=subprocess.STDOUT)
579     f.close()
580     f = open(output_file, 'r')
581     result = f.read()
582     if result != "" and logger:
583         logger.debug(result)
584     if p == 0:
585         return True
586     else:
587         if logger:
588             logger.error("Error when executing command %s" % cmd)
589         exit(-1)
590
591
592 def get_git_branch(repo_path):
593     """
594     Get git branch name
595     """
596     repo = Repo(repo_path)
597     branch = repo.active_branch
598     return branch.name
599
600
601 def get_installer_type(logger=None):
602     """
603     Get installer type (fuel, foreman, apex, joid, compass)
604     """
605     try:
606         installer = os.environ['INSTALLER_TYPE']
607     except KeyError:
608         if logger:
609             logger.error("Impossible to retrieve the installer type")
610         installer = "Unkown"
611
612     return installer
613
614
615 def get_pod_name(logger=None):
616     """
617     Get PoD Name from env variable NODE_NAME
618     """
619     try:
620         return os.environ['NODE_NAME']
621     except KeyError:
622         if logger:
623             logger.error("Unable to retrieve the POD name from environment.Using pod name 'unknown-pod'")
624         return "unknown-pod"
625
626
627 def push_results_to_db(db_url, case_name, logger, pod_name,
628                        git_version, payload):
629     url = db_url + "/results"
630     installer = get_installer_type(logger)
631     params = {"project_name": "functest", "case_name": case_name,
632               "pod_name": pod_name, "installer": installer,
633               "version": git_version, "details": payload}
634
635     headers = {'Content-Type': 'application/json'}
636     try:
637         r = requests.post(url, data=json.dumps(params), headers=headers)
638         logger.debug(r)
639         return True
640     except:
641         print "Error:", sys.exc_info()[0]
642         return False
643
644
645 def getTestEnv(test, functest_yaml):
646     # get the config of the testcase based on functest_config.yaml
647     # 2 options
648     # - test = test project e.g; ovno
649     # - test = testcase e.g. functest/odl
650     # look for the / to see if it is a test project or a testcase
651     try:
652         TEST_ENV = functest_yaml.get("test-dependencies")
653
654         if test.find("/") < 0:
655             config_test = TEST_ENV[test]
656         else:
657             test_split = test.split("/")
658             testproject = test_split[0]
659             testcase = test_split[1]
660             config_test = TEST_ENV[testproject][testcase]
661     except KeyError:
662         # if not defined in dependencies => no dependencies
663         config_test = ""
664     except:
665         print "Error getTestEnv:", sys.exc_info()[0]
666
667     return config_test
668
669
670 def get_ci_envvars():
671     """
672     Get the CI env variables
673     """
674     ci_env_var = {
675         "installer": os.environ.get('INSTALLER_TYPE'),
676         "controller": os.environ.get('SDN_CONTROLLER'),
677         "options": os.environ.get("OPNFV_FEATURE")}
678     return ci_env_var
679
680
681 def isTestRunnable(test, functest_yaml):
682     # check getTestEnv(test) and CI env var
683     # check installer, controller and options
684     # e.g. if test needs onos => do not run odl suite
685     try:
686         # By default we assume that all the tests are always runnable...
687         is_runnable = True
688         # Retrieve CI environment
689         ci_env = get_ci_envvars()
690
691         # Retrieve test environement from config file
692         test_env = getTestEnv(test, functest_yaml)
693
694         # if test_env not empty => dependencies to be checked
695         if test_env is not None and len(test_env) > 0:
696             # possible criteria = ["installer", "controller", "options"]
697             # consider test criteria from config file
698             # compare towards CI env through CI en variable
699             for criteria in test_env:
700                 if test_env[criteria] != ci_env[criteria]:
701                     # print "Test "+ test + " cannot be run on the environment"
702                     is_runnable = False
703     except:
704         print "Error isTestRunnable:", sys.exc_info()[0]
705     return is_runnable
706
707
708 def generateTestcaseList(functest_yaml):
709     try:
710         test_list = ""
711         # Retrieve CI environment
712         get_ci_envvars()
713
714         # get testcases
715         testcase_list = functest_yaml.get("test-dependencies")
716         projects = testcase_list.keys()
717         for project in projects:
718             testcases = testcase_list[project]
719             # 1 or 2 levels for testcases project[/case]
720             # if only project name without controller or scenario
721             # => shall be runnable on any controller/scenario
722             if testcases is None:
723                 test_list += project + " "
724             else:
725                 for testcase in testcases:
726                     if testcase == "controller" or testcase == "scenario":
727                         # project (1 level)
728                         if isTestRunnable(project, functest_yaml):
729                             test_list += project + " "
730                     else:
731                         # project/testcase (2 levels)
732                         thetest = project + "/" + testcase
733                         if isTestRunnable(thetest, functest_yaml):
734                             test_list += testcase + " "
735
736         # create a file that could be consumed by run-test.sh
737         file = open("testcase-list.txt", 'w')
738         file.write(test_list)
739         file.close()
740
741         return test_list
742
743         # test for each testcase if it is runnable
744         # towards the declared configuration
745         # generate the test config file
746     except:
747         print "Error generateTestcaseList:", sys.exc_info()[0]