Merge "Config test_accounts_file for refstack_defcore"
[functest.git] / functest / opnfv_tests / sdn / onos / onos.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2017 HUAWEI TECHNOLOGIES CO.,LTD and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import logging
11 import os
12 import pkg_resources
13 import re
14 import subprocess
15 import shutil
16 import time
17 import urlparse
18
19 from functest.core import testcase
20 from functest.utils.constants import CONST
21 import functest.utils.functest_utils as ft_utils
22 import functest.utils.openstack_utils as openstack_utils
23
24
25 class OnosBase(testcase.TestCase):
26     onos_repo_path = CONST.__getattribute__('dir_repo_onos')
27     onos_sfc_image_name = CONST.__getattribute__('onos_sfc_image_name')
28     onos_sfc_image_path = os.path.join(
29         CONST.__getattribute__('dir_functest_images'),
30         CONST.__getattribute__('onos_sfc_image_file_name'))
31     onos_sfc_path = pkg_resources.resource_filename(
32         'functest', 'opnfv_tests/sdn/onos/sfc')
33     installer_type = CONST.__getattribute__('INSTALLER_TYPE')
34     logger = logging.getLogger(__name__)
35
36     def __init__(self, **kwargs):
37         if "case_name" not in kwargs:
38             kwargs["case_name"] = "onos_base"
39         super(OnosBase, self).__init__(**kwargs)
40
41     def run(self):
42         self.start_time = time.time()
43         try:
44             self._run()
45             res = testcase.TestCase.EX_OK
46         except Exception as e:
47             self.logger.error('Error with run: %s', e)
48             res = testcase.TestCase.EX_RUN_ERROR
49
50         self.stop_time = time.time()
51         return res
52
53     def _run(self):
54         raise NotImplementedError('_run is not implemented')
55
56
57 class Onos(OnosBase):
58     def __init__(self, **kwargs):
59         if "case_name" not in kwargs:
60             kwargs["case_name"] = "onos"
61         super(Onos, self).__init__(**kwargs)
62         self.log_path = os.path.join(self.onos_repo_path, 'TestON/logs')
63
64     def set_onos_ip(self):
65         if (self.installer_type and
66                 self.installer_type.lower() == 'joid'):
67             sdn_controller_env = os.getenv('SDN_CONTROLLER')
68             OC1 = re.search(r"\d+\.\d+\.\d+\.\d+", sdn_controller_env).group()
69         else:
70             neutron_url = openstack_utils.get_endpoint(service_type='network')
71             OC1 = urlparse.urlparse(neutron_url).hostname
72         os.environ['OC1'] = OC1
73         self.logger.debug("ONOS IP is %s", OC1)
74
75     def run_onos_script(self, testname):
76         cli_dir = os.path.join(self.onos_repo_path, 'TestON/bin/cli.py')
77         cmd = '{0} run {1}'.format(cli_dir, testname)
78         self.logger.debug("Run script: %s", testname)
79         ft_utils.execute_command_raise(
80             cmd,
81             error_msg=('Error when running ONOS script: %s'
82                        % (testname)))
83
84     def clean_existing_logs(self):
85         log_dir = [f for f in os.listdir(self.log_path)]
86         for log in log_dir:
87             try:
88                 if os.path.isdir(log):
89                     shutil.rmtree(log)
90                 elif os.path.isfile(log):
91                     os.remove(log)
92             except OSError as e:
93                 self.logger.error('Error with deleting file %s: %s',
94                                   log, e.strerror)
95
96     def get_result(self):
97         cmd = 'grep -rnh Fail {0}'.format(self.log_path)
98         p = subprocess.Popen(cmd,
99                              shell=True,
100                              stdout=subprocess.PIPE,
101                              stderr=subprocess.STDOUT)
102
103         for line in p.stdout:
104             self.logger.debug(line)
105             if re.search("\s+[1-9]+\s+", line):
106                 self.logger.debug("Testcase Fails\n" + line)
107
108         cmd = "grep -rnh 'Execution Time' {0}".format(self.log_path)
109         result_buffer = os.popen(cmd).read()
110         time1 = result_buffer[114:128]
111         time2 = result_buffer[28:42]
112         cmd = "grep -rnh 'Success Percentage' {0}".format(
113             os.path.join(self.log_path, "FUNCvirNetNB_*"))
114         result_buffer = os.popen(cmd).read()
115         if result_buffer.find('100%') >= 0:
116             result1 = 'Success'
117         else:
118             result1 = 'Failed'
119         cmd = "grep -rnh 'Success Percentage' {0}".format(
120             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
121         result_buffer = os.popen(cmd).read()
122         if result_buffer.find('100%') >= 0:
123             result2 = 'Success'
124         else:
125             result2 = 'Failed'
126         status1 = []
127         status2 = []
128         cmd = "grep -rnh 'h3' {0}".format(
129             os.path.join(self.log_path, "FUNCvirNetNB_*"))
130         result_buffer = os.popen(cmd).read()
131         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
132         # res = pattern.search(result_buffer).groups()
133         res = pattern.findall(result_buffer)
134         i = 0
135         for index in range(len(res)):
136             status1.append({'Case name:': res[i][0] + res[i][1],
137                             'Case result': res[i][2]})
138             i = i + 1
139         cmd = "grep -rnh 'h3' {0}".format(
140             os.path.join(self.log_path, "FUNCvirNetNBL3*"))
141         result_buffer = os.popen(cmd).read()
142         pattern = re.compile("<h3>([^-]+) - ([^-]+) - (\S*)</h3>")
143         res = pattern.findall(result_buffer)
144         i = 0
145         for index in range(len(res)):
146             status2.append({'Case name:': res[i][0] + res[i][1],
147                             'Case result': res[i][2]})
148             i = i + 1
149         payload = {'FUNCvirNet': {'duration': time1,
150                                   'result': result1,
151                                   'status': status1},
152                    'FUNCvirNetL3': {'duration': time2,
153                                     'result': result2,
154                                     'status': status2}}
155         return payload
156
157     def parse_result(self):
158         result = self.get_result()
159         status = "FAIL"
160         try:
161             if (result['FUNCvirNet']['result'] == "Success" and
162                     result['FUNCvirNetL3']['result'] == "Success"):
163                 status = "PASS"
164         except Exception:
165             self.logger.error("Unable to set ONOS result")
166
167         self.result = status
168         self.details = result
169
170     def _run(self):
171         self.clean_existing_logs()
172         self.set_onos_ip()
173         self.run_onos_script('FUNCvirNetNB')
174         self.run_onos_script('FUNCvirNetNBL3')
175         self.parse_result()
176
177
178 class OnosSfc(OnosBase):
179     def __init__(self, **kwargs):
180         if "case_name" not in kwargs:
181             kwargs["case_name"] = "onos_sfc"
182         super(OnosSfc, self).__init__(**kwargs)
183
184     def get_ip(self, type):
185         url = openstack_utils.get_endpoint(service_type=type)
186         self.logger.debug('get_ip for %s: %s', type, url)
187         return urlparse.urlparse(url).hostname
188
189     def update_sfc_onos_file(self, before, after):
190         file_dir = os.path.join(self.onos_sfc_path, "sfc_onos.py")
191         cmd = "sed -i 's/{0}/{1}/g' {2}".format(before,
192                                                 after,
193                                                 file_dir)
194         ft_utils.execute_command_raise(
195             cmd,
196             error_msg=('Error with replacing %s with %s'
197                        % (before, after)))
198
199     def create_image(self):
200         self.logger.warn('inside create_image')
201         glance_client = openstack_utils.get_glance_client()
202         image_id = openstack_utils.create_glance_image(
203             glance_client,
204             self.onos_sfc_image_name,
205             self.onos_sfc_image_path)
206         if image_id is None:
207             raise Exception('Failed to create image')
208
209         self.logger.debug("Image '%s' with ID=%s is created successfully.",
210                           self.onos_sfc_image_name, image_id)
211
212     def set_sfc_conf(self):
213         self.update_sfc_onos_file("keystone_ip", self.get_ip("identity"))
214         self.update_sfc_onos_file("neutron_ip", self.get_ip("network"))
215         self.update_sfc_onos_file("nova_ip", self.get_ip("compute"))
216         self.update_sfc_onos_file("glance_ip", self.get_ip("image"))
217         self.update_sfc_onos_file("console",
218                                   CONST.__getattribute__('OS_PASSWORD'))
219         neutron_client = openstack_utils.get_neutron_client()
220         ext_net = openstack_utils.get_external_net(neutron_client)
221         self.update_sfc_onos_file("admin_floating_net", ext_net)
222         self.logger.debug("SFC configuration is modified")
223
224     def sfc_test(self):
225         cmd = 'python {0}'.format(os.path.join(self.onos_sfc_path, 'sfc.py'))
226         ft_utils.execute_command_raise(cmd,
227                                        error_msg='Error with testing SFC')
228
229     def _run(self):
230         self.create_image()
231         self.set_sfc_conf()
232         self.sfc_test()