Merge "More Unit Tests for tempest and IMS module"
[functest.git] / functest / opnfv_tests / sdn / onos / onos.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import os
11 import re
12 import subprocess
13 import shutil
14 import time
15 import urlparse
16
17 from functest.core import testcase
18 from functest.utils.constants import CONST
19 import functest.utils.functest_logger as ft_logger
20 import functest.utils.functest_utils as ft_utils
21 import functest.utils.openstack_utils as openstack_utils
22
23
24 logger = ft_logger.Logger(__name__).getLogger()
25
26
27 class OnosBase(testcase.TestCase):
28     onos_repo_path = CONST.dir_repo_onos
29     onos_sfc_image_name = CONST.onos_sfc_image_name
30     onos_sfc_image_path = os.path.join(CONST.dir_functest_data,
31                                        CONST.onos_sfc_image_file_name)
32     onos_sfc_path = os.path.join(CONST.dir_repo_functest,
33                                  CONST.dir_onos_sfc)
34
35     def __init__(self):
36         super(OnosBase, self).__init__()
37
38     def run(self):
39         self.start_time = time.time()
40         try:
41             self._run()
42             res = testcase.TestCase.EX_OK
43         except Exception as e:
44             logger.error('Error with run: %s', e)
45             res = testcase.TestCase.EX_RUN_ERROR
46
47         self.stop_time = time.time()
48         return res
49
50     def _run(self):
51         raise NotImplementedError('_run is not implemented')
52
53
54 class Onos(OnosBase):
55     def __init__(self):
56         super(Onos, self).__init__()
57         self.case_name = 'onos'
58         self.log_path = os.path.join(self.onos_repo_path, 'TestON/logs')
59
60     def set_onos_ip(self):
61         if (CONST.INSTALLER_TYPE and
62                 CONST.INSTALLER_TYPE.lower() == 'joid'):
63             sdn_controller_env = os.getenv('SDN_CONTROLLER')
64             OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", sdn_controller_env).group()
65         else:
66             neutron_url = openstack_utils.get_endpoint(service_type='network')
67             OC1 = urlparse.urlparse(neutron_url).hostname
68         os.environ['OC1'] = OC1
69         logger.debug("ONOS IP is %s" % OC1)
70
71     def run_onos_script(self, testname):
72         cli_dir = os.path.join(self.onos_repo_path, 'TestON/bin/cli.py')
73         cmd = '{0} run {1}'.format(cli_dir, testname)
74         logger.debug("Run script: %s" % testname)
75         ft_utils.execute_command_raise(
76             cmd,
77             error_msg=('Error when running ONOS script: %s'
78                        % (testname)))
79
80     def clean_existing_logs(self):
81         log_dir = [f for f in os.listdir(self.log_path)]
82         for log in log_dir:
83             try:
84                 if os.path.isdir(log):
85                     shutil.rmtree(log)
86                 elif os.path.isfile(log):
87                     os.remove(log)
88             except OSError as e:
89                 logger.error('Error with deleting file %s: %s',
90                              log, e.strerror)
91
92     def get_result(self):
93         cmd = 'grep -rnh Fail {0}'.format(self.log_path)
94         p = subprocess.Popen(cmd,
95                              shell=True,
96                              stdout=subprocess.PIPE,
97                              stderr=subprocess.STDOUT)
98
99         for line in p.stdout:
100             logger.debug(line)
101             if re.search("\s+[1-9]+\s+", line):
102                 logger.debug("Testcase Fails\n" + line)
103
104         cmd = "grep -rnh 'Execution Time' {0}".format(self.log_path)
105         result_buffer = os.popen(cmd).read()
106         time1 = result_buffer[114:128]
107         time2 = result_buffer[28:42]
108         cmd = "grep -rnh 'Success Percentage' {0}".format(
109             os.path.join(self.log_path, "FUNCvirNetNB_*"))
110         result_buffer = os.popen(cmd).read()
111         if result_buffer.find('100%') >= 0:
112             result1 = 'Success'
113         else:
114             result1 = 'Failed'
115         cmd = "grep -rnh 'Success Percentage' {0}".format(
116             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
117         result_buffer = os.popen(cmd).read()
118         if result_buffer.find('100%') >= 0:
119             result2 = 'Success'
120         else:
121             result2 = 'Failed'
122         status1 = []
123         status2 = []
124         cmd = "grep -rnh 'h3' {0}".format(
125             os.path.join(self.log_path, "FUNCvirNetNB_*"))
126         result_buffer = os.popen(cmd).read()
127         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
128         # res = pattern.search(result_buffer).groups()
129         res = pattern.findall(result_buffer)
130         i = 0
131         for index in range(len(res)):
132             status1.append({'Case name:': res[i][0] + res[i][1],
133                             'Case result': res[i][2]})
134             i = i + 1
135         cmd = "grep -rnh 'h3' {0}".format(
136             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
137         result_buffer = os.popen(cmd).read()
138         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
139         res = pattern.findall(result_buffer)
140         i = 0
141         for index in range(len(res)):
142             status2.append({'Case name:': res[i][0] + res[i][1],
143                             'Case result': res[i][2]})
144             i = i + 1
145         payload = {'FUNCvirNet': {'duration': time1,
146                                   'result': result1,
147                                   'status': status1},
148                    'FUNCvirNetL3': {'duration': time2,
149                                     'result': result2,
150                                     'status': status2}}
151         return payload
152
153     def parse_result(self):
154         result = self.get_result()
155         status = "FAIL"
156         try:
157             if (result['FUNCvirNet']['result'] == "Success" and
158                     result['FUNCvirNetL3']['result'] == "Success"):
159                 status = "PASS"
160         except:
161             logger.error("Unable to set ONOS criteria")
162
163         self.criteria = status
164         self.details = result
165
166     def _run(self):
167         self.clean_existing_logs()
168         self.set_onos_ip()
169         self.run_onos_script('FUNCvirNetNB')
170         self.run_onos_script('FUNCvirNetNBL3')
171         self.parse_result()
172
173
174 class OnosSfc(OnosBase):
175     def __init__(self):
176         super(OnosSfc, self).__init__()
177         self.case_name = 'onos_sfc'
178
179     def get_ip(type):
180         url = openstack_utils.get_endpoint(service_type=type)
181         logger.debug('get_ip for %s: %s' % (type, url))
182         return urlparse.urlparse(url).hostname
183
184     def update_sfc_onos_file(self, before, after):
185         file_dir = os.path.join(self.onos_sfc_path, "sfc_onos.py")
186         cmd = "sed -i 's/{0}/{1}/g' {2}".format(before,
187                                                 after,
188                                                 file_dir)
189         ft_utils.execute_command_raise(
190             cmd,
191             error_msg=('Error with replacing %s with %s'
192                        % (before, after)))
193
194     def create_image(self):
195         glance_client = openstack_utils.get_glance_client()
196         image_id = openstack_utils.create_glance_image(
197             glance_client,
198             self.onos_sfc_image_name,
199             self.onos_sfc_image_path)
200         if image_id is None:
201             raise Exception('Failed to create image')
202
203         logger.debug("Image '%s' with ID=%s is created successfully."
204                      % (self.onos_sfc_image_name, image_id))
205
206     def set_sfc_conf(self):
207         self.update_sfc_onos_file("keystone_ip", self.get_ip("keystone"))
208         self.update_sfc_onos_file("neutron_ip", self.get_ip("neutron"))
209         self.update_sfc_onos_file("nova_ip", self.get_ip("nova"))
210         self.update_sfc_onos_file("glance_ip", self.get_ip("glance"))
211         self.update_sfc_onos_file("console", CONST.OS_PASSWORD)
212         neutron_client = openstack_utils.get_neutron_client()
213         ext_net = openstack_utils.get_external_net(neutron_client)
214         self.update_sfc_onos_file("admin_floating_net", ext_net)
215         logger.debug("SFC configuration is modified")
216
217     def sfc_test(self):
218         cmd = 'python {0}'.format(os.path.join(self.onos_sfc_path, 'sfc.py'))
219         ft_utils.execute_command_raise(cmd,
220                                        error_msg='Error with testing SFC')
221
222     def _run(self):
223         self.create_image()
224         self.set_sfc_conf()
225         self.sfc_test()