Merge "NSB installation fails at Yardstick GUI Ubuntu 18"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 import errno
17 from itertools import chain
18 import logging
19 import multiprocessing
20 import os
21 import os.path
22 import re
23
24 import jinja2
25 import pkg_resources
26 from oslo_config import cfg
27 from oslo_utils.encodeutils import safe_decode
28
29 from yardstick import ssh
30 from yardstick.common.exceptions import ResourceCommandError
31 from yardstick.common.task_template import finalize_for_yaml
32 from yardstick.common.utils import validate_non_string_sequence
33 from yardstick.network_services.nfvi.collectd import AmqpConsumer
34 from yardstick.benchmark.contexts import heat
35
36
37 LOG = logging.getLogger(__name__)
38
39 CONF = cfg.CONF
40 ZMQ_OVS_PORT = 5567
41 ZMQ_POLLING_TIME = 12000
42 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
43                         "hugepages"]
44
45
46 class ResourceProfile(object):
47     """
48     This profile adds a resource at the beginning of the test session
49     """
50     COLLECTD_CONF = "collectd.conf"
51     AMPQ_PORT = 5672
52     DEFAULT_INTERVAL = 25
53     DEFAULT_TIMEOUT = 3600
54     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
55
56     def __init__(self, mgmt, port_names=None, plugins=None,
57                  interval=None, timeout=None, reset_mq_flag=True):
58
59         if plugins is None:
60             self.plugins = {}
61         else:
62             self.plugins = plugins
63
64         if interval is None:
65             self.interval = self.DEFAULT_INTERVAL
66         else:
67             self.interval = interval
68
69         if timeout is None:
70             self.timeout = self.DEFAULT_TIMEOUT
71         else:
72             self.timeout = timeout
73
74         self.enable = True
75         self._queue = multiprocessing.Queue()
76         self.amqp_client = None
77         self.port_names = validate_non_string_sequence(port_names, default=[])
78
79         # we need to save mgmt so we can connect to port 5672
80         self.mgmt = mgmt
81         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
82         self._reset_mq_flag = reset_mq_flag
83
84     @classmethod
85     def make_from_node(cls, node, timeout):
86         # node dict works as mgmt dict
87         # don't need port names, there is no way we can
88         # tell what port is used on the compute node
89         collectd_options = node["collectd"]
90         plugins = collectd_options.get("plugins", {})
91         interval = collectd_options.get("interval")
92
93         reset_mq_flag = (False if node.get("ctx_type") == heat.HeatContext.__context_type__
94                           else True)
95         return cls(node, plugins=plugins, interval=interval,
96                    timeout=timeout, reset_mq_flag=reset_mq_flag)
97
98     def check_if_system_agent_running(self, process):
99         """ verify if system agent is running """
100         try:
101             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
102             # strip whitespace
103             return err, pid.strip()
104         except OSError as e:
105             if e.errno in {errno.ECONNRESET}:
106                 # if we can't connect to check, then we won't be able to connect to stop it
107                 LOG.exception("Can't connect to host to check %s status", process)
108                 return 1, None
109             raise
110
111     def run_collectd_amqp(self):
112         """ run amqp consumer to collect the NFVi data """
113         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
114         amqp = AmqpConsumer(amqp_url, self._queue)
115         try:
116             amqp.run()
117         except (AttributeError, RuntimeError, KeyboardInterrupt):
118             amqp.stop()
119
120     @classmethod
121     def parse_simple_resource(cls, key, value):
122         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
123         return {reskey: value.split(":")[1]}
124
125     @classmethod
126     def get_cpu_data(cls, res_key0, res_key1, value):
127         """ Get cpu topology of the host """
128         pattern = r"-(\d+)"
129
130         if 'cpufreq' in res_key0:
131             metric, source = res_key0, res_key1
132         else:
133             metric, source = res_key1, res_key0
134
135         match = re.search(pattern, source, re.MULTILINE)
136         if not match:
137             return "error", "Invalid", "", ""
138
139         time, value = value.split(":")
140         return str(match.group(1)), metric, value, time
141
142     @classmethod
143     def parse_hugepages(cls, key, value):
144         return cls.parse_simple_resource(key, value)
145
146     @classmethod
147     def parse_dpdkstat(cls, key, value):
148         return cls.parse_simple_resource(key, value)
149
150     @classmethod
151     def parse_virt(cls, key, value):
152         return cls.parse_simple_resource(key, value)
153
154     @classmethod
155     def parse_ovs_stats(cls, key, value):
156         return cls.parse_simple_resource(key, value)
157
158     @classmethod
159     def parse_intel_pmu_stats(cls, key, value):
160         return {''.join(str(v) for v in key): value.split(":")[1]}
161
162     def parse_collectd_result(self, metrics):
163         """ convert collectd data into json"""
164         result = {
165             "cpu": {},
166             "memory": {},
167             "hugepages": {},
168             "dpdkstat": {},
169             "virt": {},
170             "ovs_stats": {},
171         }
172         testcase = ""
173
174         # unicode decode
175         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
176         for key, value in decoded:
177             key_split = key.split("/")
178             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
179             res_key0 = next(res_key_iter)
180             res_key1 = next(res_key_iter)
181
182             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
183                 cpu_key, name, metric, testcase = \
184                     self.get_cpu_data(res_key0, res_key1, value)
185                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
186
187             elif "memory" in res_key0:
188                 result["memory"].update({res_key1: value.split(":")[0]})
189
190             elif "hugepages" in res_key0:
191                 result["hugepages"].update(self.parse_hugepages(key_split, value))
192
193             elif "dpdkstat" in res_key0:
194                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
195
196             elif "virt" in res_key1:
197                 result["virt"].update(self.parse_virt(key_split, value))
198
199             elif "ovs_stats" in res_key0:
200                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
201
202         result["timestamp"] = testcase
203
204         return result
205
206     def amqp_process_for_nfvi_kpi(self):
207         """ amqp collect and return nfvi kpis """
208         if self.amqp_client is None and self.enable:
209             self.amqp_client = multiprocessing.Process(
210                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
211                 target=self.run_collectd_amqp)
212             self.amqp_client.start()
213
214     def amqp_collect_nfvi_kpi(self):
215         """ amqp collect and return nfvi kpis """
216         if not self.enable:
217             return {}
218
219         if self.check_if_system_agent_running("collectd")[0] != 0:
220             return {}
221
222         metric = {}
223         while not self._queue.empty():
224             metric.update(self._queue.get())
225
226         return self.parse_collectd_result(metric)
227
228     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
229         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
230                                                  nfvi_cfg).decode('utf-8')
231         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
232                                       finalize=finalize_for_yaml).render(
233             **template_kwargs)
234         # cfg_content = io.StringIO(template.format(**template_kwargs))
235         cfg_file = os.path.join(config_file_path, nfvi_cfg)
236         # must write as root, so use sudo
237         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
238
239     def _prepare_collectd_conf(self, config_file_path):
240         """ Prepare collectd conf """
241
242         kwargs = {
243             "interval": self.interval,
244             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
245             # Optional fields PortName is descriptive only, use whatever is present
246             "port_names": self.port_names,
247             # "ovs_bridge_interfaces": ["br-int"],
248             "plugins": self.plugins,
249         }
250         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
251
252     def _setup_ovs_stats(self, connection):
253         try:
254             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
255         except KeyError:
256             # ovs_stats is not a dict
257             socket_path = self.OVS_SOCKET_PATH
258         status = connection.execute("test -S {}".format(socket_path))[0]
259         if status != 0:
260             LOG.error("cannot find OVS socket %s", socket_path)
261
262     def _reset_rabbitmq(self, connection):
263         # Reset amqp queue
264         LOG.debug("reset and setup amqp to collect data from collectd")
265         # ensure collectd.conf.d exists to avoid error/warning
266         cmd_list = ["sudo mkdir -p /etc/collectd/collectd.conf.d",
267                     "sudo service rabbitmq-server restart",
268                     "sudo rabbitmqctl stop_app",
269                     "sudo rabbitmqctl reset",
270                     "sudo rabbitmqctl start_app",
271                     "sudo rabbitmqctl add_user admin admin",
272                     "sudo rabbitmqctl authenticate_user admin admin",
273                     "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
274                     ]
275
276         for cmd in cmd_list:
277             exit_status, _, stderr = connection.execute(cmd)
278             if exit_status != 0:
279                 raise ResourceCommandError(command=cmd, stderr=stderr)
280
281     def _check_rabbitmq_user(self, connection, user='admin'):
282         exit_status, stdout, _ = connection.execute("sudo rabbitmqctl list_users")
283         if exit_status == 0:
284             for line in stdout.split('\n')[1:]:
285                 if line.split('\t')[0] == user:
286                     return True
287
288     def _set_rabbitmq_admin_user(self, connection):
289         LOG.debug("add admin user to amqp")
290         cmd_list = ["sudo rabbitmqctl add_user admin admin",
291                     "sudo rabbitmqctl authenticate_user admin admin",
292                     "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
293                     ]
294
295         for cmd in cmd_list:
296             exit_status, stdout, stderr = connection.execute(cmd)
297             if exit_status != 0:
298                 raise ResourceCommandError(command=cmd, stdout=stdout, stderr=stderr)
299
300     def _start_rabbitmq(self, connection):
301         if self._reset_mq_flag:
302             self._reset_rabbitmq(connection)
303         else:
304             if not self._check_rabbitmq_user(connection):
305                 self._set_rabbitmq_admin_user(connection)
306
307         # check stdout for "sudo rabbitmqctl status" command
308         cmd = "sudo rabbitmqctl status"
309         _, stdout, stderr = connection.execute(cmd)
310         if not re.search("RabbitMQ", stdout):
311             LOG.error("rabbitmqctl status don't have RabbitMQ in running apps")
312             raise ResourceCommandError(command=cmd, stderr=stderr)
313
314     def _start_collectd(self, connection, bin_path):
315         LOG.debug("Starting collectd to collect NFVi stats")
316         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
317         config_file_path = os.path.join(bin_path, "collectd", "etc")
318         self._prepare_collectd_conf(config_file_path)
319
320         connection.execute('sudo pkill -x -9 collectd')
321         cmd = "which %s > /dev/null 2>&1" % collectd_path
322         exit_status, _, stderr = connection.execute(cmd)
323         if exit_status != 0:
324             raise ResourceCommandError(command=cmd, stderr=stderr)
325
326         if "ovs_stats" in self.plugins:
327             self._setup_ovs_stats(connection)
328
329         LOG.debug("Starting collectd to collect NFVi stats")
330         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
331         # intel_pmu plug requires large numbers of files open, so try to set
332         # ulimit -n to a large value
333
334         cmd = "sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path
335         exit_status, _, stderr = connection.execute(cmd, timeout=self.timeout)
336         if exit_status != 0:
337             raise ResourceCommandError(command=cmd, stderr=stderr)
338
339         LOG.debug("Done")
340
341     def initiate_systemagent(self, bin_path):
342         """ Start system agent for NFVi collection on host """
343         if self.enable:
344             try:
345                 self._start_rabbitmq(self.connection)
346                 self._start_collectd(self.connection, bin_path)
347             except ResourceCommandError as e:
348                 LOG.exception("Exception during collectd and rabbitmq start: %s", str(e))
349                 raise
350
351     def start(self):
352         """ start nfvi collection """
353         if self.enable:
354             LOG.debug("Start NVFi metric collection...")
355
356     def stop(self):
357         """ stop nfvi collection """
358         if not self.enable:
359             return
360
361         agent = "collectd"
362         LOG.debug("Stop resource monitor...")
363
364         if self.amqp_client is not None:
365             # we proper and try to join first
366             self.amqp_client.join(3)
367             self.amqp_client.terminate()
368
369         LOG.debug("Check if %s is running", agent)
370         status, pid = self.check_if_system_agent_running(agent)
371         LOG.debug("status %s  pid %s", status, pid)
372         if status != 0:
373             return
374
375         if pid:
376             self.connection.execute('sudo kill -9 "%s"' % pid)
377         self.connection.execute('sudo pkill -9 "%s"' % agent)
378
379         if self._reset_mq_flag:
380             self.connection.execute('sudo service rabbitmq-server stop')
381             self.connection.execute("sudo rabbitmqctl stop_app")