Modify TestCase constructor attributes
[functest.git] / functest / opnfv_tests / sdn / onos / onos.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import os
11 import re
12 import subprocess
13 import shutil
14 import time
15 import urlparse
16
17 from functest.core import testcase
18 from functest.utils.constants import CONST
19 import functest.utils.functest_logger as ft_logger
20 import functest.utils.functest_utils as ft_utils
21 import functest.utils.openstack_utils as openstack_utils
22
23
24 logger = ft_logger.Logger(__name__).getLogger()
25
26
27 class OnosBase(testcase.TestCase):
28     onos_repo_path = CONST.dir_repo_onos
29     onos_sfc_image_name = CONST.onos_sfc_image_name
30     onos_sfc_image_path = os.path.join(CONST.dir_functest_data,
31                                        CONST.onos_sfc_image_file_name)
32     onos_sfc_path = os.path.join(CONST.dir_repo_functest,
33                                  CONST.dir_onos_sfc)
34
35     def run(self):
36         self.start_time = time.time()
37         try:
38             self._run()
39             res = testcase.TestCase.EX_OK
40         except Exception as e:
41             logger.error('Error with run: %s', e)
42             res = testcase.TestCase.EX_RUN_ERROR
43
44         self.stop_time = time.time()
45         return res
46
47     def _run(self):
48         raise NotImplementedError('_run is not implemented')
49
50
51 class Onos(OnosBase):
52     def __init__(self, **kwargs):
53         if "case_name" not in kwargs:
54             kwargs["case_name"] = "onos"
55         super(Onos, self).__init__(**kwargs)
56         self.log_path = os.path.join(self.onos_repo_path, 'TestON/logs')
57
58     def set_onos_ip(self):
59         if (CONST.INSTALLER_TYPE and
60                 CONST.INSTALLER_TYPE.lower() == 'joid'):
61             sdn_controller_env = os.getenv('SDN_CONTROLLER')
62             OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", sdn_controller_env).group()
63         else:
64             neutron_url = openstack_utils.get_endpoint(service_type='network')
65             OC1 = urlparse.urlparse(neutron_url).hostname
66         os.environ['OC1'] = OC1
67         logger.debug("ONOS IP is %s" % OC1)
68
69     def run_onos_script(self, testname):
70         cli_dir = os.path.join(self.onos_repo_path, 'TestON/bin/cli.py')
71         cmd = '{0} run {1}'.format(cli_dir, testname)
72         logger.debug("Run script: %s" % testname)
73         ft_utils.execute_command_raise(
74             cmd,
75             error_msg=('Error when running ONOS script: %s'
76                        % (testname)))
77
78     def clean_existing_logs(self):
79         log_dir = [f for f in os.listdir(self.log_path)]
80         for log in log_dir:
81             try:
82                 if os.path.isdir(log):
83                     shutil.rmtree(log)
84                 elif os.path.isfile(log):
85                     os.remove(log)
86             except OSError as e:
87                 logger.error('Error with deleting file %s: %s',
88                              log, e.strerror)
89
90     def get_result(self):
91         cmd = 'grep -rnh Fail {0}'.format(self.log_path)
92         p = subprocess.Popen(cmd,
93                              shell=True,
94                              stdout=subprocess.PIPE,
95                              stderr=subprocess.STDOUT)
96
97         for line in p.stdout:
98             logger.debug(line)
99             if re.search("\s+[1-9]+\s+", line):
100                 logger.debug("Testcase Fails\n" + line)
101
102         cmd = "grep -rnh 'Execution Time' {0}".format(self.log_path)
103         result_buffer = os.popen(cmd).read()
104         time1 = result_buffer[114:128]
105         time2 = result_buffer[28:42]
106         cmd = "grep -rnh 'Success Percentage' {0}".format(
107             os.path.join(self.log_path, "FUNCvirNetNB_*"))
108         result_buffer = os.popen(cmd).read()
109         if result_buffer.find('100%') >= 0:
110             result1 = 'Success'
111         else:
112             result1 = 'Failed'
113         cmd = "grep -rnh 'Success Percentage' {0}".format(
114             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
115         result_buffer = os.popen(cmd).read()
116         if result_buffer.find('100%') >= 0:
117             result2 = 'Success'
118         else:
119             result2 = 'Failed'
120         status1 = []
121         status2 = []
122         cmd = "grep -rnh 'h3' {0}".format(
123             os.path.join(self.log_path, "FUNCvirNetNB_*"))
124         result_buffer = os.popen(cmd).read()
125         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
126         # res = pattern.search(result_buffer).groups()
127         res = pattern.findall(result_buffer)
128         i = 0
129         for index in range(len(res)):
130             status1.append({'Case name:': res[i][0] + res[i][1],
131                             'Case result': res[i][2]})
132             i = i + 1
133         cmd = "grep -rnh 'h3' {0}".format(
134             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
135         result_buffer = os.popen(cmd).read()
136         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
137         res = pattern.findall(result_buffer)
138         i = 0
139         for index in range(len(res)):
140             status2.append({'Case name:': res[i][0] + res[i][1],
141                             'Case result': res[i][2]})
142             i = i + 1
143         payload = {'FUNCvirNet': {'duration': time1,
144                                   'result': result1,
145                                   'status': status1},
146                    'FUNCvirNetL3': {'duration': time2,
147                                     'result': result2,
148                                     'status': status2}}
149         return payload
150
151     def parse_result(self):
152         result = self.get_result()
153         status = "FAIL"
154         try:
155             if (result['FUNCvirNet']['result'] == "Success" and
156                     result['FUNCvirNetL3']['result'] == "Success"):
157                 status = "PASS"
158         except:
159             logger.error("Unable to set ONOS criteria")
160
161         self.criteria = status
162         self.details = result
163
164     def _run(self):
165         self.clean_existing_logs()
166         self.set_onos_ip()
167         self.run_onos_script('FUNCvirNetNB')
168         self.run_onos_script('FUNCvirNetNBL3')
169         self.parse_result()
170
171
172 class OnosSfc(OnosBase):
173     def __init__(self):
174         super(OnosSfc, self).__init__()
175         self.case_name = 'onos_sfc'
176
177     def get_ip(type):
178         url = openstack_utils.get_endpoint(service_type=type)
179         logger.debug('get_ip for %s: %s' % (type, url))
180         return urlparse.urlparse(url).hostname
181
182     def update_sfc_onos_file(self, before, after):
183         file_dir = os.path.join(self.onos_sfc_path, "sfc_onos.py")
184         cmd = "sed -i 's/{0}/{1}/g' {2}".format(before,
185                                                 after,
186                                                 file_dir)
187         ft_utils.execute_command_raise(
188             cmd,
189             error_msg=('Error with replacing %s with %s'
190                        % (before, after)))
191
192     def create_image(self):
193         glance_client = openstack_utils.get_glance_client()
194         image_id = openstack_utils.create_glance_image(
195             glance_client,
196             self.onos_sfc_image_name,
197             self.onos_sfc_image_path)
198         if image_id is None:
199             raise Exception('Failed to create image')
200
201         logger.debug("Image '%s' with ID=%s is created successfully."
202                      % (self.onos_sfc_image_name, image_id))
203
204     def set_sfc_conf(self):
205         self.update_sfc_onos_file("keystone_ip", self.get_ip("keystone"))
206         self.update_sfc_onos_file("neutron_ip", self.get_ip("neutron"))
207         self.update_sfc_onos_file("nova_ip", self.get_ip("nova"))
208         self.update_sfc_onos_file("glance_ip", self.get_ip("glance"))
209         self.update_sfc_onos_file("console", CONST.OS_PASSWORD)
210         neutron_client = openstack_utils.get_neutron_client()
211         ext_net = openstack_utils.get_external_net(neutron_client)
212         self.update_sfc_onos_file("admin_floating_net", ext_net)
213         logger.debug("SFC configuration is modified")
214
215     def sfc_test(self):
216         cmd = 'python {0}'.format(os.path.join(self.onos_sfc_path, 'sfc.py'))
217         ft_utils.execute_command_raise(cmd,
218                                        error_msg='Error with testing SFC')
219
220     def _run(self):
221         self.create_image()
222         self.set_sfc_conf()
223         self.sfc_test()