fe37669a420b256e0a3f7a711061f7df85cdc3d2
[functest.git] / functest / opnfv_tests / sdn / onos / onos.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import os
11 import re
12 import subprocess
13 import shutil
14 import time
15 import urlparse
16
17 from functest.core import testcase
18 from functest.utils.constants import CONST
19 import functest.utils.functest_logger as ft_logger
20 import functest.utils.functest_utils as ft_utils
21 import functest.utils.openstack_utils as openstack_utils
22
23
24 logger = ft_logger.Logger(__name__).getLogger()
25
26
27 class OnosBase(testcase.TestCase):
28     onos_repo_path = CONST.dir_repo_onos
29     onos_sfc_image_name = CONST.onos_sfc_image_name
30     onos_sfc_image_path = os.path.join(CONST.dir_functest_data,
31                                        CONST.onos_sfc_image_file_name)
32     onos_sfc_path = os.path.join(CONST.dir_repo_functest,
33                                  CONST.dir_onos_sfc)
34
35     def run(self):
36         self.start_time = time.time()
37         try:
38             self._run()
39             res = testcase.TestCase.EX_OK
40         except Exception as e:
41             logger.error('Error with run: %s', e)
42             res = testcase.TestCase.EX_RUN_ERROR
43
44         self.stop_time = time.time()
45         return res
46
47     def _run(self):
48         raise NotImplementedError('_run is not implemented')
49
50
51 class Onos(OnosBase):
52     def __init__(self, case_name='onos'):
53         super(Onos, self).__init__(case_name)
54         self.log_path = os.path.join(self.onos_repo_path, 'TestON/logs')
55
56     def set_onos_ip(self):
57         if (CONST.INSTALLER_TYPE and
58                 CONST.INSTALLER_TYPE.lower() == 'joid'):
59             sdn_controller_env = os.getenv('SDN_CONTROLLER')
60             OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", sdn_controller_env).group()
61         else:
62             neutron_url = openstack_utils.get_endpoint(service_type='network')
63             OC1 = urlparse.urlparse(neutron_url).hostname
64         os.environ['OC1'] = OC1
65         logger.debug("ONOS IP is %s" % OC1)
66
67     def run_onos_script(self, testname):
68         cli_dir = os.path.join(self.onos_repo_path, 'TestON/bin/cli.py')
69         cmd = '{0} run {1}'.format(cli_dir, testname)
70         logger.debug("Run script: %s" % testname)
71         ft_utils.execute_command_raise(
72             cmd,
73             error_msg=('Error when running ONOS script: %s'
74                        % (testname)))
75
76     def clean_existing_logs(self):
77         log_dir = [f for f in os.listdir(self.log_path)]
78         for log in log_dir:
79             try:
80                 if os.path.isdir(log):
81                     shutil.rmtree(log)
82                 elif os.path.isfile(log):
83                     os.remove(log)
84             except OSError as e:
85                 logger.error('Error with deleting file %s: %s',
86                              log, e.strerror)
87
88     def get_result(self):
89         cmd = 'grep -rnh Fail {0}'.format(self.log_path)
90         p = subprocess.Popen(cmd,
91                              shell=True,
92                              stdout=subprocess.PIPE,
93                              stderr=subprocess.STDOUT)
94
95         for line in p.stdout:
96             logger.debug(line)
97             if re.search("\s+[1-9]+\s+", line):
98                 logger.debug("Testcase Fails\n" + line)
99
100         cmd = "grep -rnh 'Execution Time' {0}".format(self.log_path)
101         result_buffer = os.popen(cmd).read()
102         time1 = result_buffer[114:128]
103         time2 = result_buffer[28:42]
104         cmd = "grep -rnh 'Success Percentage' {0}".format(
105             os.path.join(self.log_path, "FUNCvirNetNB_*"))
106         result_buffer = os.popen(cmd).read()
107         if result_buffer.find('100%') >= 0:
108             result1 = 'Success'
109         else:
110             result1 = 'Failed'
111         cmd = "grep -rnh 'Success Percentage' {0}".format(
112             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
113         result_buffer = os.popen(cmd).read()
114         if result_buffer.find('100%') >= 0:
115             result2 = 'Success'
116         else:
117             result2 = 'Failed'
118         status1 = []
119         status2 = []
120         cmd = "grep -rnh 'h3' {0}".format(
121             os.path.join(self.log_path, "FUNCvirNetNB_*"))
122         result_buffer = os.popen(cmd).read()
123         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
124         # res = pattern.search(result_buffer).groups()
125         res = pattern.findall(result_buffer)
126         i = 0
127         for index in range(len(res)):
128             status1.append({'Case name:': res[i][0] + res[i][1],
129                             'Case result': res[i][2]})
130             i = i + 1
131         cmd = "grep -rnh 'h3' {0}".format(
132             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
133         result_buffer = os.popen(cmd).read()
134         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
135         res = pattern.findall(result_buffer)
136         i = 0
137         for index in range(len(res)):
138             status2.append({'Case name:': res[i][0] + res[i][1],
139                             'Case result': res[i][2]})
140             i = i + 1
141         payload = {'FUNCvirNet': {'duration': time1,
142                                   'result': result1,
143                                   'status': status1},
144                    'FUNCvirNetL3': {'duration': time2,
145                                     'result': result2,
146                                     'status': status2}}
147         return payload
148
149     def parse_result(self):
150         result = self.get_result()
151         status = "FAIL"
152         try:
153             if (result['FUNCvirNet']['result'] == "Success" and
154                     result['FUNCvirNetL3']['result'] == "Success"):
155                 status = "PASS"
156         except:
157             logger.error("Unable to set ONOS criteria")
158
159         self.criteria = status
160         self.details = result
161
162     def _run(self):
163         self.clean_existing_logs()
164         self.set_onos_ip()
165         self.run_onos_script('FUNCvirNetNB')
166         self.run_onos_script('FUNCvirNetNBL3')
167         self.parse_result()
168
169
170 class OnosSfc(OnosBase):
171     def __init__(self):
172         super(OnosSfc, self).__init__()
173         self.case_name = 'onos_sfc'
174
175     def get_ip(type):
176         url = openstack_utils.get_endpoint(service_type=type)
177         logger.debug('get_ip for %s: %s' % (type, url))
178         return urlparse.urlparse(url).hostname
179
180     def update_sfc_onos_file(self, before, after):
181         file_dir = os.path.join(self.onos_sfc_path, "sfc_onos.py")
182         cmd = "sed -i 's/{0}/{1}/g' {2}".format(before,
183                                                 after,
184                                                 file_dir)
185         ft_utils.execute_command_raise(
186             cmd,
187             error_msg=('Error with replacing %s with %s'
188                        % (before, after)))
189
190     def create_image(self):
191         glance_client = openstack_utils.get_glance_client()
192         image_id = openstack_utils.create_glance_image(
193             glance_client,
194             self.onos_sfc_image_name,
195             self.onos_sfc_image_path)
196         if image_id is None:
197             raise Exception('Failed to create image')
198
199         logger.debug("Image '%s' with ID=%s is created successfully."
200                      % (self.onos_sfc_image_name, image_id))
201
202     def set_sfc_conf(self):
203         self.update_sfc_onos_file("keystone_ip", self.get_ip("keystone"))
204         self.update_sfc_onos_file("neutron_ip", self.get_ip("neutron"))
205         self.update_sfc_onos_file("nova_ip", self.get_ip("nova"))
206         self.update_sfc_onos_file("glance_ip", self.get_ip("glance"))
207         self.update_sfc_onos_file("console", CONST.OS_PASSWORD)
208         neutron_client = openstack_utils.get_neutron_client()
209         ext_net = openstack_utils.get_external_net(neutron_client)
210         self.update_sfc_onos_file("admin_floating_net", ext_net)
211         logger.debug("SFC configuration is modified")
212
213     def sfc_test(self):
214         cmd = 'python {0}'.format(os.path.join(self.onos_sfc_path, 'sfc.py'))
215         ft_utils.execute_command_raise(cmd,
216                                        error_msg='Error with testing SFC')
217
218     def _run(self):
219         self.create_image()
220         self.set_sfc_conf()
221         self.sfc_test()