Merge "bugfix: remove pod_name in host and unify host parameter"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
57
58     def __init__(self, mgmt, port_names=None, cores=None, plugins=None,
59                  interval=None, timeout=None):
60
61         if plugins is None:
62             self.plugins = {}
63         else:
64             self.plugins = plugins
65
66         if interval is None:
67             self.interval = self.DEFAULT_INTERVAL
68         else:
69             self.interval = interval
70
71         if timeout is None:
72             self.timeout = self.DEFAULT_TIMEOUT
73         else:
74             self.timeout = timeout
75
76         self.enable = True
77         self._queue = multiprocessing.Queue()
78         self.amqp_client = None
79         self.port_names = validate_non_string_sequence(port_names, default=[])
80
81         # we need to save mgmt so we can connect to port 5672
82         self.mgmt = mgmt
83         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
84
85     @classmethod
86     def make_from_node(cls, node, timeout):
87         # node dict works as mgmt dict
88         # don't need port names, there is no way we can
89         # tell what port is used on the compute node
90         collectd_options = node["collectd"]
91         plugins = collectd_options.get("plugins", {})
92         interval = collectd_options.get("interval")
93
94         # use default cores = None to MatchAllCores
95         return cls(node, plugins=plugins, interval=interval, timeout=timeout)
96
97     def check_if_sa_running(self, process):
98         """ verify if system agent is running """
99         try:
100             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
101             # strip whitespace
102             return err, pid.strip()
103         except OSError as e:
104             if e.errno in {errno.ECONNRESET}:
105                 # if we can't connect to check, then we won't be able to connect to stop it
106                 LOG.exception("can't connect to host to check collectd status")
107                 return 1, None
108             raise
109
110     def run_collectd_amqp(self):
111         """ run amqp consumer to collect the NFVi data """
112         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
113         amqp = AmqpConsumer(amqp_url, self._queue)
114         try:
115             amqp.run()
116         except (AttributeError, RuntimeError, KeyboardInterrupt):
117             amqp.stop()
118
119     @classmethod
120     def parse_simple_resource(cls, key, value):
121         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
122         return {reskey: value.split(":")[1]}
123
124     @classmethod
125     def get_cpu_data(cls, res_key0, res_key1, value):
126         """ Get cpu topology of the host """
127         pattern = r"-(\d+)"
128
129         if 'cpufreq' in res_key0:
130             metric, source = res_key0, res_key1
131         else:
132             metric, source = res_key1, res_key0
133
134         match = re.search(pattern, source, re.MULTILINE)
135         if not match:
136             return "error", "Invalid", "", ""
137
138         time, value = value.split(":")
139         return str(match.group(1)), metric, value, time
140
141     @classmethod
142     def parse_hugepages(cls, key, value):
143         return cls.parse_simple_resource(key, value)
144
145     @classmethod
146     def parse_dpdkstat(cls, key, value):
147         return cls.parse_simple_resource(key, value)
148
149     @classmethod
150     def parse_virt(cls, key, value):
151         return cls.parse_simple_resource(key, value)
152
153     @classmethod
154     def parse_ovs_stats(cls, key, value):
155         return cls.parse_simple_resource(key, value)
156
157     @classmethod
158     def parse_intel_pmu_stats(cls, key, value):
159         return {''.join(str(v) for v in key): value.split(":")[1]}
160
161     def parse_collectd_result(self, metrics):
162         """ convert collectd data into json"""
163         result = {
164             "cpu": {},
165             "memory": {},
166             "hugepages": {},
167             "dpdkstat": {},
168             "virt": {},
169             "ovs_stats": {},
170         }
171         testcase = ""
172
173         # unicode decode
174         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
175         for key, value in decoded:
176             key_split = key.split("/")
177             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
178             res_key0 = next(res_key_iter)
179             res_key1 = next(res_key_iter)
180
181             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
182                 cpu_key, name, metric, testcase = \
183                     self.get_cpu_data(res_key0, res_key1, value)
184                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
185
186             elif "memory" in res_key0:
187                 result["memory"].update({res_key1: value.split(":")[0]})
188
189             elif "hugepages" in res_key0:
190                 result["hugepages"].update(self.parse_hugepages(key_split, value))
191
192             elif "dpdkstat" in res_key0:
193                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
194
195             elif "virt" in res_key1:
196                 result["virt"].update(self.parse_virt(key_split, value))
197
198             elif "ovs_stats" in res_key0:
199                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
200
201         result["timestamp"] = testcase
202
203         return result
204
205     def amqp_process_for_nfvi_kpi(self):
206         """ amqp collect and return nfvi kpis """
207         if self.amqp_client is None and self.enable:
208             self.amqp_client = multiprocessing.Process(
209                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
210                 target=self.run_collectd_amqp)
211             self.amqp_client.start()
212
213     def amqp_collect_nfvi_kpi(self):
214         """ amqp collect and return nfvi kpis """
215         if not self.enable:
216             return {}
217
218         metric = {}
219         while not self._queue.empty():
220             metric.update(self._queue.get())
221         msg = self.parse_collectd_result(metric)
222         return msg
223
224     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
225         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
226                                                  nfvi_cfg).decode('utf-8')
227         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
228                                       finalize=finalize_for_yaml).render(
229             **template_kwargs)
230         # cfg_content = io.StringIO(template.format(**template_kwargs))
231         cfg_file = os.path.join(config_file_path, nfvi_cfg)
232         # must write as root, so use sudo
233         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
234
235     def _prepare_collectd_conf(self, config_file_path):
236         """ Prepare collectd conf """
237
238         kwargs = {
239             "interval": self.interval,
240             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
241             # Optional fields PortName is descriptive only, use whatever is present
242             "port_names": self.port_names,
243             # "ovs_bridge_interfaces": ["br-int"],
244             "plugins": self.plugins,
245         }
246         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
247
248     def _setup_intel_pmu(self, connection, bin_path):
249         pmu_event_path = os.path.join(bin_path, "pmu_event.json")
250         try:
251             self.plugins["intel_pmu"]["pmu_event_path"] = pmu_event_path
252         except KeyError:
253             # if intel_pmu is not a dict, force it into a dict
254             self.plugins["intel_pmu"] = {"pmu_event_path": pmu_event_path}
255         LOG.debug("Downloading event list for pmu_stats plugin")
256         cmd = 'cd {0}; PMU_EVENTS_PATH={1} python event_download_local.py'.format(
257             bin_path, pmu_event_path)
258         cmd = "sudo bash -c '{}'".format(cmd)
259         connection.execute(cmd)
260
261     def _setup_ovs_stats(self, connection):
262         try:
263             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
264         except KeyError:
265             # ovs_stats is not a dict
266             socket_path = self.OVS_SOCKET_PATH
267         status = connection.execute("test -S {}".format(socket_path))[0]
268         if status != 0:
269             LOG.error("cannot find OVS socket %s", socket_path)
270
271     def _start_collectd(self, connection, bin_path):
272         LOG.debug("Starting collectd to collect NFVi stats")
273         connection.execute('sudo pkill -x -9 collectd')
274         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
275         config_file_path = os.path.join(bin_path, "collectd", "etc")
276         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
277         if exit_status != 0:
278             LOG.warning("%s is not present disabling", collectd_path)
279             # disable auto-provisioning because it requires Internet access
280             # collectd_installer = os.path.join(bin_path, "collectd.sh")
281             # provision_tool(connection, collectd)
282             # http_proxy = os.environ.get('http_proxy', '')
283             # https_proxy = os.environ.get('https_proxy', '')
284             # connection.execute("sudo %s '%s' '%s'" % (
285             #     collectd_installer, http_proxy, https_proxy))
286             return
287         if "intel_pmu" in self.plugins:
288             self._setup_intel_pmu(connection, bin_path)
289         if "ovs_stats" in self.plugins:
290             self._setup_ovs_stats(connection)
291
292         LOG.debug("Starting collectd to collect NFVi stats")
293         # ensure collectd.conf.d exists to avoid error/warning
294         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
295         self._prepare_collectd_conf(config_file_path)
296
297         # Reset amqp queue
298         LOG.debug("reset and setup amqp to collect data from collectd")
299         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
300         connection.execute("sudo service rabbitmq-server start")
301         connection.execute("sudo rabbitmqctl stop_app")
302         connection.execute("sudo rabbitmqctl reset")
303         connection.execute("sudo rabbitmqctl start_app")
304         connection.execute("sudo service rabbitmq-server restart")
305
306         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
307         connection.execute("sudo rabbitmqctl delete_user guest")
308         connection.execute("sudo rabbitmqctl add_user admin admin")
309         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
310         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
311
312         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
313         # intel_pmu plug requires large numbers of files open, so try to set
314         # ulimit -n to a large value
315         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
316                            timeout=self.timeout)
317         LOG.debug("Done")
318
319     def initiate_systemagent(self, bin_path):
320         """ Start system agent for NFVi collection on host """
321         if self.enable:
322             try:
323                 self._start_collectd(self.connection, bin_path)
324             except Exception:
325                 LOG.exception("Exception during collectd start")
326                 raise
327
328     def start(self):
329         """ start nfvi collection """
330         if self.enable:
331             LOG.debug("Start NVFi metric collection...")
332
333     def stop(self):
334         """ stop nfvi collection """
335         if not self.enable:
336             return
337
338         agent = "collectd"
339         LOG.debug("Stop resource monitor...")
340
341         if self.amqp_client is not None:
342             # we proper and try to join first
343             self.amqp_client.join(3)
344             self.amqp_client.terminate()
345
346         LOG.debug("Check if %s is running", agent)
347         status, pid = self.check_if_sa_running(agent)
348         LOG.debug("status %s  pid %s", status, pid)
349         if status != 0:
350             return
351
352         if pid:
353             self.connection.execute('sudo kill -9 "%s"' % pid)
354         self.connection.execute('sudo pkill -9 "%s"' % agent)
355         self.connection.execute('sudo service rabbitmq-server stop')
356         self.connection.execute("sudo rabbitmqctl stop_app")