Delete functest.utils.functest_logger
[functest.git] / functest / opnfv_tests / sdn / onos / onos.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import logging
11 import os
12 import re
13 import subprocess
14 import shutil
15 import time
16 import urlparse
17
18 from functest.core import testcase
19 from functest.utils.constants import CONST
20 import functest.utils.functest_utils as ft_utils
21 import functest.utils.openstack_utils as openstack_utils
22
23
24 class OnosBase(testcase.TestCase):
25     onos_repo_path = CONST.__getattribute__('dir_repo_onos')
26     onos_sfc_image_name = CONST.__getattribute__('onos_sfc_image_name')
27     onos_sfc_image_path = os.path.join(
28         CONST.__getattribute__('dir_functest_data'),
29         CONST.__getattribute__('onos_sfc_image_file_name'))
30     onos_sfc_path = os.path.join(CONST.__getattribute__('dir_repo_functest'),
31                                  CONST.__getattribute__('dir_onos_sfc'))
32     installer_type = CONST.__getattribute__('INSTALLER_TYPE')
33     logger = logging.getLogger(__name__)
34
35     def __init__(self, **kwargs):
36         if "case_name" not in kwargs:
37             kwargs["case_name"] = "onos_base"
38         super(OnosBase, self).__init__(**kwargs)
39
40     def run(self):
41         self.start_time = time.time()
42         try:
43             self._run()
44             res = testcase.TestCase.EX_OK
45         except Exception as e:
46             self.logger.error('Error with run: %s', e)
47             res = testcase.TestCase.EX_RUN_ERROR
48
49         self.stop_time = time.time()
50         return res
51
52     def _run(self):
53         raise NotImplementedError('_run is not implemented')
54
55
56 class Onos(OnosBase):
57     def __init__(self, **kwargs):
58         if "case_name" not in kwargs:
59             kwargs["case_name"] = "onos"
60         super(Onos, self).__init__(**kwargs)
61         self.log_path = os.path.join(self.onos_repo_path, 'TestON/logs')
62
63     def set_onos_ip(self):
64         if (self.installer_type and
65                 self.installer_type.lower() == 'joid'):
66             sdn_controller_env = os.getenv('SDN_CONTROLLER')
67             OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", sdn_controller_env).group()
68         else:
69             neutron_url = openstack_utils.get_endpoint(service_type='network')
70             OC1 = urlparse.urlparse(neutron_url).hostname
71         os.environ['OC1'] = OC1
72         self.logger.debug("ONOS IP is %s", OC1)
73
74     def run_onos_script(self, testname):
75         cli_dir = os.path.join(self.onos_repo_path, 'TestON/bin/cli.py')
76         cmd = '{0} run {1}'.format(cli_dir, testname)
77         self.logger.debug("Run script: %s", testname)
78         ft_utils.execute_command_raise(
79             cmd,
80             error_msg=('Error when running ONOS script: %s'
81                        % (testname)))
82
83     def clean_existing_logs(self):
84         log_dir = [f for f in os.listdir(self.log_path)]
85         for log in log_dir:
86             try:
87                 if os.path.isdir(log):
88                     shutil.rmtree(log)
89                 elif os.path.isfile(log):
90                     os.remove(log)
91             except OSError as e:
92                 self.logger.error('Error with deleting file %s: %s',
93                                   log, e.strerror)
94
95     def get_result(self):
96         cmd = 'grep -rnh Fail {0}'.format(self.log_path)
97         p = subprocess.Popen(cmd,
98                              shell=True,
99                              stdout=subprocess.PIPE,
100                              stderr=subprocess.STDOUT)
101
102         for line in p.stdout:
103             self.logger.debug(line)
104             if re.search("\s+[1-9]+\s+", line):
105                 self.logger.debug("Testcase Fails\n" + line)
106
107         cmd = "grep -rnh 'Execution Time' {0}".format(self.log_path)
108         result_buffer = os.popen(cmd).read()
109         time1 = result_buffer[114:128]
110         time2 = result_buffer[28:42]
111         cmd = "grep -rnh 'Success Percentage' {0}".format(
112             os.path.join(self.log_path, "FUNCvirNetNB_*"))
113         result_buffer = os.popen(cmd).read()
114         if result_buffer.find('100%') >= 0:
115             result1 = 'Success'
116         else:
117             result1 = 'Failed'
118         cmd = "grep -rnh 'Success Percentage' {0}".format(
119             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
120         result_buffer = os.popen(cmd).read()
121         if result_buffer.find('100%') >= 0:
122             result2 = 'Success'
123         else:
124             result2 = 'Failed'
125         status1 = []
126         status2 = []
127         cmd = "grep -rnh 'h3' {0}".format(
128             os.path.join(self.log_path, "FUNCvirNetNB_*"))
129         result_buffer = os.popen(cmd).read()
130         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
131         # res = pattern.search(result_buffer).groups()
132         res = pattern.findall(result_buffer)
133         i = 0
134         for index in range(len(res)):
135             status1.append({'Case name:': res[i][0] + res[i][1],
136                             'Case result': res[i][2]})
137             i = i + 1
138         cmd = "grep -rnh 'h3' {0}".format(
139             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
140         result_buffer = os.popen(cmd).read()
141         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
142         res = pattern.findall(result_buffer)
143         i = 0
144         for index in range(len(res)):
145             status2.append({'Case name:': res[i][0] + res[i][1],
146                             'Case result': res[i][2]})
147             i = i + 1
148         payload = {'FUNCvirNet': {'duration': time1,
149                                   'result': result1,
150                                   'status': status1},
151                    'FUNCvirNetL3': {'duration': time2,
152                                     'result': result2,
153                                     'status': status2}}
154         return payload
155
156     def parse_result(self):
157         result = self.get_result()
158         status = "FAIL"
159         try:
160             if (result['FUNCvirNet']['result'] == "Success" and
161                     result['FUNCvirNetL3']['result'] == "Success"):
162                 status = "PASS"
163         except Exception:
164             self.logger.error("Unable to set ONOS result")
165
166         self.result = status
167         self.details = result
168
169     def _run(self):
170         self.clean_existing_logs()
171         self.set_onos_ip()
172         self.run_onos_script('FUNCvirNetNB')
173         self.run_onos_script('FUNCvirNetNBL3')
174         self.parse_result()
175
176
177 class OnosSfc(OnosBase):
178     def __init__(self, **kwargs):
179         if "case_name" not in kwargs:
180             kwargs["case_name"] = "onos_sfc"
181         super(OnosSfc, self).__init__(**kwargs)
182
183     def get_ip(self, type):
184         url = openstack_utils.get_endpoint(service_type=type)
185         self.logger.debug('get_ip for %s: %s', type, url)
186         return urlparse.urlparse(url).hostname
187
188     def update_sfc_onos_file(self, before, after):
189         file_dir = os.path.join(self.onos_sfc_path, "sfc_onos.py")
190         cmd = "sed -i 's/{0}/{1}/g' {2}".format(before,
191                                                 after,
192                                                 file_dir)
193         ft_utils.execute_command_raise(
194             cmd,
195             error_msg=('Error with replacing %s with %s'
196                        % (before, after)))
197
198     def create_image(self):
199         self.logger.warn('inside create_image')
200         glance_client = openstack_utils.get_glance_client()
201         image_id = openstack_utils.create_glance_image(
202             glance_client,
203             self.onos_sfc_image_name,
204             self.onos_sfc_image_path)
205         if image_id is None:
206             raise Exception('Failed to create image')
207
208         self.logger.debug("Image '%s' with ID=%s is created successfully.",
209                           self.onos_sfc_image_name, image_id)
210
211     def set_sfc_conf(self):
212         self.update_sfc_onos_file("keystone_ip", self.get_ip("keystone"))
213         self.update_sfc_onos_file("neutron_ip", self.get_ip("neutron"))
214         self.update_sfc_onos_file("nova_ip", self.get_ip("nova"))
215         self.update_sfc_onos_file("glance_ip", self.get_ip("glance"))
216         self.update_sfc_onos_file("console",
217                                   CONST.__getattribute__('OS_PASSWORD'))
218         neutron_client = openstack_utils.get_neutron_client()
219         ext_net = openstack_utils.get_external_net(neutron_client)
220         self.update_sfc_onos_file("admin_floating_net", ext_net)
221         self.logger.debug("SFC configuration is modified")
222
223     def sfc_test(self):
224         cmd = 'python {0}'.format(os.path.join(self.onos_sfc_path, 'sfc.py'))
225         ft_utils.execute_command_raise(cmd,
226                                        error_msg='Error with testing SFC')
227
228     def _run(self):
229         self.create_image()
230         self.set_sfc_conf()
231         self.sfc_test()