add yardstick iruya 9.0.0 release notes
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 import errno
17 from itertools import chain
18 import logging
19 import multiprocessing
20 import os
21 import os.path
22 import re
23
24 import jinja2
25 import pkg_resources
26 from oslo_config import cfg
27 from oslo_utils.encodeutils import safe_decode
28
29 from yardstick import ssh
30 from yardstick.common.exceptions import ResourceCommandError
31 from yardstick.common.task_template import finalize_for_yaml
32 from yardstick.common.utils import validate_non_string_sequence
33 from yardstick.network_services.nfvi.collectd import AmqpConsumer
34 from yardstick.benchmark.contexts import heat
35
36
37 LOG = logging.getLogger(__name__)
38
39 CONF = cfg.CONF
40 ZMQ_OVS_PORT = 5567
41 ZMQ_POLLING_TIME = 12000
42 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
43                         "hugepages"]
44
45
46 class ResourceProfile(object):
47     """
48     This profile adds a resource at the beginning of the test session
49     """
50     COLLECTD_CONF = "collectd.conf"
51     BAR_COLLECTD_CONF_PATH = "/opt/collectd/etc/collectd.conf.d/"
52     AMPQ_PORT = 5672
53     DEFAULT_INTERVAL = 25
54     DEFAULT_TIMEOUT = 3600
55     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
56
57     def __init__(self, mgmt, port_names=None, plugins=None,
58                  interval=None, timeout=None, reset_mq_flag=True):
59
60         if plugins is None:
61             self.plugins = {}
62         else:
63             self.plugins = plugins
64
65         if interval is None:
66             self.interval = self.DEFAULT_INTERVAL
67         else:
68             self.interval = interval
69
70         if timeout is None:
71             self.timeout = self.DEFAULT_TIMEOUT
72         else:
73             self.timeout = timeout
74
75         self.enable = True
76         self._queue = multiprocessing.Queue()
77         self.amqp_client = None
78         self.port_names = validate_non_string_sequence(port_names, default=[])
79
80         # we need to save mgmt so we can connect to port 5672
81         self.mgmt = mgmt
82         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
83         self._reset_mq_flag = reset_mq_flag
84
85     @classmethod
86     def make_from_node(cls, node, timeout):
87         # node dict works as mgmt dict
88         # don't need port names, there is no way we can
89         # tell what port is used on the compute node
90         collectd_options = node["collectd"]
91         plugins = collectd_options.get("plugins", {})
92         interval = collectd_options.get("interval")
93
94         reset_mq_flag = (False if node.get("ctx_type") == heat.HeatContext.__context_type__
95                           else True)
96         return cls(node, plugins=plugins, interval=interval,
97                    timeout=timeout, reset_mq_flag=reset_mq_flag)
98
99     def check_if_system_agent_running(self, process):
100         """ verify if system agent is running """
101         try:
102             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
103             # strip whitespace
104             return err, pid.strip()
105         except OSError as e:
106             if e.errno in {errno.ECONNRESET}:
107                 # if we can't connect to check, then we won't be able to connect to stop it
108                 LOG.exception("Can't connect to host to check %s status", process)
109                 return 1, None
110             raise
111
112     def run_collectd_amqp(self):
113         """ run amqp consumer to collect the NFVi data """
114         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
115         amqp = AmqpConsumer(amqp_url, self._queue)
116         try:
117             amqp.run()
118         except (AttributeError, RuntimeError, KeyboardInterrupt):
119             amqp.stop()
120
121     @classmethod
122     def parse_simple_resource(cls, key, value):
123         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
124         return {reskey: value.split(":")[1]}
125
126     @classmethod
127     def get_cpu_data(cls, res_key0, res_key1, value):
128         """ Get cpu topology of the host """
129         pattern = r"-(\d+)"
130
131         if 'cpufreq' in res_key0:
132             metric, source = res_key0, res_key1
133         else:
134             metric, source = res_key1, res_key0
135
136         match = re.search(pattern, source, re.MULTILINE)
137         if not match:
138             return "error", "Invalid", "", ""
139
140         time, value = value.split(":")
141         return str(match.group(1)), metric, value, time
142
143     @classmethod
144     def parse_hugepages(cls, key, value):
145         return cls.parse_simple_resource(key, value)
146
147     @classmethod
148     def parse_dpdkstat(cls, key, value):
149         return cls.parse_simple_resource(key, value)
150
151     @classmethod
152     def parse_virt(cls, key, value):
153         return cls.parse_simple_resource(key, value)
154
155     @classmethod
156     def parse_ovs_stats(cls, key, value):
157         return cls.parse_simple_resource(key, value)
158
159     @classmethod
160     def parse_intel_pmu_stats(cls, key, value):
161         return {''.join(str(v) for v in key): value.split(":")[1]}
162
163     def parse_collectd_result(self, metrics):
164         """ convert collectd data into json"""
165         result = {
166             "cpu": {},
167             "memory": {},
168             "hugepages": {},
169             "dpdkstat": {},
170             "virt": {},
171             "ovs_stats": {},
172         }
173         testcase = ""
174
175         # unicode decode
176         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
177         for key, value in decoded:
178             key_split = key.split("/")
179             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
180             res_key0 = next(res_key_iter)
181             res_key1 = next(res_key_iter)
182
183             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
184                 cpu_key, name, metric, testcase = \
185                     self.get_cpu_data(res_key0, res_key1, value)
186                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
187
188             elif "memory" in res_key0:
189                 result["memory"].update({res_key1: value.split(":")[0]})
190
191             elif "hugepages" in res_key0:
192                 result["hugepages"].update(self.parse_hugepages(key_split, value))
193
194             elif "dpdkstat" in res_key0:
195                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
196
197             elif "virt" in res_key1:
198                 result["virt"].update(self.parse_virt(key_split, value))
199
200             elif "ovs_stats" in res_key0:
201                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
202
203         result["timestamp"] = testcase
204
205         return result
206
207     def amqp_process_for_nfvi_kpi(self):
208         """ amqp collect and return nfvi kpis """
209         if self.amqp_client is None and self.enable:
210             self.amqp_client = multiprocessing.Process(
211                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
212                 target=self.run_collectd_amqp)
213             self.amqp_client.start()
214
215     def amqp_collect_nfvi_kpi(self):
216         """ amqp collect and return nfvi kpis """
217         if not self.enable:
218             return {}
219
220         if self.check_if_system_agent_running("collectd")[0] != 0:
221             return {}
222
223         metric = {}
224         while not self._queue.empty():
225             metric.update(self._queue.get())
226
227         return self.parse_collectd_result(metric)
228
229     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
230         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
231                                                  nfvi_cfg).decode('utf-8')
232         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
233                                       finalize=finalize_for_yaml).render(
234             **template_kwargs)
235         # cfg_content = io.StringIO(template.format(**template_kwargs))
236         cfg_file = os.path.join(config_file_path, nfvi_cfg)
237         # must write as root, so use sudo
238         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
239
240     def _prepare_collectd_conf(self, config_file_path):
241         """ Prepare collectd conf """
242
243         kwargs = {
244             "interval": self.interval,
245             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
246             # Optional fields PortName is descriptive only, use whatever is present
247             "port_names": self.port_names,
248             # "ovs_bridge_interfaces": ["br-int"],
249             "plugins": self.plugins,
250         }
251         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
252         self._provide_config_file(self.BAR_COLLECTD_CONF_PATH,
253                                   self.COLLECTD_CONF, kwargs)
254
255     def _setup_ovs_stats(self, connection):
256         try:
257             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
258         except KeyError:
259             # ovs_stats is not a dict
260             socket_path = self.OVS_SOCKET_PATH
261         status = connection.execute("test -S {}".format(socket_path))[0]
262         if status != 0:
263             LOG.error("cannot find OVS socket %s", socket_path)
264
265     def _reset_rabbitmq(self, connection):
266         # Reset amqp queue
267         LOG.debug("reset and setup amqp to collect data from collectd")
268         # ensure collectd.conf.d exists to avoid error/warning
269         cmd_list = ["sudo mkdir -p /etc/collectd/collectd.conf.d",
270                     "sudo service rabbitmq-server restart",
271                     "sudo rabbitmqctl stop_app",
272                     "sudo rabbitmqctl reset",
273                     "sudo rabbitmqctl start_app",
274                     "sudo rabbitmqctl add_user admin admin",
275                     "sudo rabbitmqctl authenticate_user admin admin",
276                     "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
277                     ]
278
279         for cmd in cmd_list:
280             exit_status, _, stderr = connection.execute(cmd)
281             if exit_status != 0:
282                 raise ResourceCommandError(command=cmd, stderr=stderr)
283
284     def _check_rabbitmq_user(self, connection, user='admin'):
285         exit_status, stdout, _ = connection.execute("sudo rabbitmqctl list_users")
286         if exit_status == 0:
287             for line in stdout.split('\n')[1:]:
288                 if line.split('\t')[0] == user:
289                     return True
290
291     def _set_rabbitmq_admin_user(self, connection):
292         LOG.debug("add admin user to amqp")
293         cmd_list = ["sudo rabbitmqctl add_user admin admin",
294                     "sudo rabbitmqctl authenticate_user admin admin",
295                     "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
296                     ]
297
298         for cmd in cmd_list:
299             exit_status, stdout, stderr = connection.execute(cmd)
300             if exit_status != 0:
301                 raise ResourceCommandError(command=cmd, stdout=stdout, stderr=stderr)
302
303     def _start_rabbitmq(self, connection):
304         if self._reset_mq_flag:
305             self._reset_rabbitmq(connection)
306         else:
307             if not self._check_rabbitmq_user(connection):
308                 self._set_rabbitmq_admin_user(connection)
309
310         # check stdout for "sudo rabbitmqctl status" command
311         cmd = "sudo rabbitmqctl status"
312         _, stdout, stderr = connection.execute(cmd)
313         if not re.search("RabbitMQ", stdout):
314             LOG.error("rabbitmqctl status don't have RabbitMQ in running apps")
315             raise ResourceCommandError(command=cmd, stderr=stderr)
316
317     def _start_collectd(self, connection, bin_path):
318         LOG.debug("Starting collectd to collect NFVi stats")
319         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
320         config_file_path = os.path.join(bin_path, "collectd", "etc")
321         self._prepare_collectd_conf(config_file_path)
322
323         connection.execute('sudo pkill -x -9 collectd')
324         cmd = "which %s > /dev/null 2>&1" % collectd_path
325         exit_status, _, stderr = connection.execute(cmd)
326         if exit_status != 0:
327             raise ResourceCommandError(command=cmd, stderr=stderr)
328
329         if "ovs_stats" in self.plugins:
330             self._setup_ovs_stats(connection)
331
332         LOG.debug("Starting collectd to collect NFVi stats")
333         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
334         # intel_pmu plug requires large numbers of files open, so try to set
335         # ulimit -n to a large value
336
337         cmd = "sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path
338         exit_status, _, stderr = connection.execute(cmd, timeout=self.timeout)
339         if exit_status != 0:
340             raise ResourceCommandError(command=cmd, stderr=stderr)
341
342         LOG.debug("Done")
343
344     def initiate_systemagent(self, bin_path):
345         """ Start system agent for NFVi collection on host """
346         if self.enable:
347             try:
348                 self._start_rabbitmq(self.connection)
349                 self._start_collectd(self.connection, bin_path)
350             except ResourceCommandError as e:
351                 LOG.exception("Exception during collectd and rabbitmq start: %s", str(e))
352                 raise
353
354     def start(self):
355         """ start nfvi collection """
356         if self.enable:
357             LOG.debug("Start NVFi metric collection...")
358
359     def stop(self):
360         """ stop nfvi collection """
361         if not self.enable:
362             return
363
364         agent = "collectd"
365         LOG.debug("Stop resource monitor...")
366
367         if self.amqp_client is not None:
368             # we proper and try to join first
369             self.amqp_client.join(3)
370             self.amqp_client.terminate()
371
372         LOG.debug("Check if %s is running", agent)
373         status, pid = self.check_if_system_agent_running(agent)
374         LOG.debug("status %s  pid %s", status, pid)
375         if status != 0:
376             return
377
378         if pid:
379             self.connection.execute('sudo kill -9 "%s"' % pid)
380         self.connection.execute('sudo pkill -9 "%s"' % agent)
381
382         if self._reset_mq_flag:
383             self.connection.execute('sudo service rabbitmq-server stop')
384             self.connection.execute("sudo rabbitmqctl stop_app")