dea754d8baef577cd8aef7448c61168bd75dbc96
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
57
58     def __init__(self, mgmt, port_names=None, cores=None, plugins=None,
59                  interval=None, timeout=None):
60
61         if plugins is None:
62             self.plugins = {}
63         else:
64             self.plugins = plugins
65
66         if interval is None:
67             self.interval = self.DEFAULT_INTERVAL
68         else:
69             self.interval = interval
70
71         if timeout is None:
72             self.timeout = self.DEFAULT_TIMEOUT
73         else:
74             self.timeout = timeout
75
76         self.enable = True
77         self._queue = multiprocessing.Queue()
78         self.amqp_client = None
79         self.port_names = validate_non_string_sequence(port_names, default=[])
80
81         # we need to save mgmt so we can connect to port 5672
82         self.mgmt = mgmt
83         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
84
85     def check_if_sa_running(self, process):
86         """ verify if system agent is running """
87         try:
88             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
89             # strip whitespace
90             return err, pid.strip()
91         except OSError as e:
92             if e.errno in {errno.ECONNRESET}:
93                 # if we can't connect to check, then we won't be able to connect to stop it
94                 LOG.exception("can't connect to host to check collectd status")
95                 return 1, None
96             raise
97
98     def run_collectd_amqp(self):
99         """ run amqp consumer to collect the NFVi data """
100         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
101         amqp = AmqpConsumer(amqp_url, self._queue)
102         try:
103             amqp.run()
104         except (AttributeError, RuntimeError, KeyboardInterrupt):
105             amqp.stop()
106
107     @classmethod
108     def parse_simple_resource(cls, key, value):
109         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
110         return {reskey: value.split(":")[1]}
111
112     @classmethod
113     def get_cpu_data(cls, res_key0, res_key1, value):
114         """ Get cpu topology of the host """
115         pattern = r"-(\d+)"
116
117         if 'cpufreq' in res_key0:
118             metric, source = res_key0, res_key1
119         else:
120             metric, source = res_key1, res_key0
121
122         match = re.search(pattern, source, re.MULTILINE)
123         if not match:
124             return "error", "Invalid", "", ""
125
126         time, value = value.split(":")
127         return str(match.group(1)), metric, value, time
128
129     @classmethod
130     def parse_hugepages(cls, key, value):
131         return cls.parse_simple_resource(key, value)
132
133     @classmethod
134     def parse_dpdkstat(cls, key, value):
135         return cls.parse_simple_resource(key, value)
136
137     @classmethod
138     def parse_virt(cls, key, value):
139         return cls.parse_simple_resource(key, value)
140
141     @classmethod
142     def parse_ovs_stats(cls, key, value):
143         return cls.parse_simple_resource(key, value)
144
145     @classmethod
146     def parse_intel_pmu_stats(cls, key, value):
147         return {''.join(str(v) for v in key): value.split(":")[1]}
148
149     def parse_collectd_result(self, metrics):
150         """ convert collectd data into json"""
151         result = {
152             "cpu": {},
153             "memory": {},
154             "hugepages": {},
155             "dpdkstat": {},
156             "virt": {},
157             "ovs_stats": {},
158         }
159         testcase = ""
160
161         # unicode decode
162         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
163         for key, value in decoded:
164             key_split = key.split("/")
165             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
166             res_key0 = next(res_key_iter)
167             res_key1 = next(res_key_iter)
168
169             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
170                 cpu_key, name, metric, testcase = \
171                     self.get_cpu_data(res_key0, res_key1, value)
172                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
173
174             elif "memory" in res_key0:
175                 result["memory"].update({res_key1: value.split(":")[0]})
176
177             elif "hugepages" in res_key0:
178                 result["hugepages"].update(self.parse_hugepages(key_split, value))
179
180             elif "dpdkstat" in res_key0:
181                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
182
183             elif "virt" in res_key1:
184                 result["virt"].update(self.parse_virt(key_split, value))
185
186             elif "ovs_stats" in res_key0:
187                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
188
189         result["timestamp"] = testcase
190
191         return result
192
193     def amqp_process_for_nfvi_kpi(self):
194         """ amqp collect and return nfvi kpis """
195         if self.amqp_client is None and self.enable:
196             self.amqp_client = multiprocessing.Process(
197                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
198                 target=self.run_collectd_amqp)
199             self.amqp_client.start()
200
201     def amqp_collect_nfvi_kpi(self):
202         """ amqp collect and return nfvi kpis """
203         if not self.enable:
204             return {}
205
206         metric = {}
207         while not self._queue.empty():
208             metric.update(self._queue.get())
209         msg = self.parse_collectd_result(metric)
210         return msg
211
212     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
213         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
214                                                  nfvi_cfg).decode('utf-8')
215         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
216                                       finalize=finalize_for_yaml).render(
217             **template_kwargs)
218         # cfg_content = io.StringIO(template.format(**template_kwargs))
219         cfg_file = os.path.join(config_file_path, nfvi_cfg)
220         # must write as root, so use sudo
221         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
222
223     def _prepare_collectd_conf(self, config_file_path):
224         """ Prepare collectd conf """
225
226         kwargs = {
227             "interval": self.interval,
228             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
229             # Optional fields PortName is descriptive only, use whatever is present
230             "port_names": self.port_names,
231             # "ovs_bridge_interfaces": ["br-int"],
232             "plugins": self.plugins,
233         }
234         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
235
236     def _setup_intel_pmu(self, connection, bin_path):
237         pmu_event_path = os.path.join(bin_path, "pmu_event.json")
238         try:
239             self.plugins["intel_pmu"]["pmu_event_path"] = pmu_event_path
240         except KeyError:
241             # if intel_pmu is not a dict, force it into a dict
242             self.plugins["intel_pmu"] = {"pmu_event_path": pmu_event_path}
243         LOG.debug("Downloading event list for pmu_stats plugin")
244         cmd = 'cd {0}; PMU_EVENTS_PATH={1} python event_download_local.py'.format(
245             bin_path, pmu_event_path)
246         cmd = "sudo bash -c '{}'".format(cmd)
247         connection.execute(cmd)
248
249     def _setup_ovs_stats(self, connection):
250         try:
251             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
252         except KeyError:
253             # ovs_stats is not a dict
254             socket_path = self.OVS_SOCKET_PATH
255         status = connection.execute("test -S {}".format(socket_path))[0]
256         if status != 0:
257             LOG.error("cannot find OVS socket %s", socket_path)
258
259     def _start_collectd(self, connection, bin_path):
260         LOG.debug("Starting collectd to collect NFVi stats")
261         connection.execute('sudo pkill -x -9 collectd')
262         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
263         config_file_path = os.path.join(bin_path, "collectd", "etc")
264         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
265         if exit_status != 0:
266             LOG.warning("%s is not present disabling", collectd_path)
267             # disable auto-provisioning because it requires Internet access
268             # collectd_installer = os.path.join(bin_path, "collectd.sh")
269             # provision_tool(connection, collectd)
270             # http_proxy = os.environ.get('http_proxy', '')
271             # https_proxy = os.environ.get('https_proxy', '')
272             # connection.execute("sudo %s '%s' '%s'" % (
273             #     collectd_installer, http_proxy, https_proxy))
274             return
275         if "intel_pmu" in self.plugins:
276             self._setup_intel_pmu(connection, bin_path)
277         if "ovs_stats" in self.plugins:
278             self._setup_ovs_stats(connection)
279
280         LOG.debug("Starting collectd to collect NFVi stats")
281         # ensure collectd.conf.d exists to avoid error/warning
282         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
283         self._prepare_collectd_conf(config_file_path)
284
285         # Reset amqp queue
286         LOG.debug("reset and setup amqp to collect data from collectd")
287         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
288         connection.execute("sudo service rabbitmq-server start")
289         connection.execute("sudo rabbitmqctl stop_app")
290         connection.execute("sudo rabbitmqctl reset")
291         connection.execute("sudo rabbitmqctl start_app")
292         connection.execute("sudo service rabbitmq-server restart")
293
294         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
295         connection.execute("sudo rabbitmqctl delete_user guest")
296         connection.execute("sudo rabbitmqctl add_user admin admin")
297         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
298         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
299
300         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
301         # intel_pmu plug requires large numbers of files open, so try to set
302         # ulimit -n to a large value
303         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
304                            timeout=self.timeout)
305         LOG.debug("Done")
306
307     def initiate_systemagent(self, bin_path):
308         """ Start system agent for NFVi collection on host """
309         if self.enable:
310             try:
311                 self._start_collectd(self.connection, bin_path)
312             except Exception:
313                 LOG.exception("Exception during collectd start")
314                 raise
315
316     def start(self):
317         """ start nfvi collection """
318         if self.enable:
319             LOG.debug("Start NVFi metric collection...")
320
321     def stop(self):
322         """ stop nfvi collection """
323         if not self.enable:
324             return
325
326         agent = "collectd"
327         LOG.debug("Stop resource monitor...")
328
329         if self.amqp_client is not None:
330             # we proper and try to join first
331             self.amqp_client.join(3)
332             self.amqp_client.terminate()
333
334         LOG.debug("Check if %s is running", agent)
335         status, pid = self.check_if_sa_running(agent)
336         LOG.debug("status %s  pid %s", status, pid)
337         if status != 0:
338             return
339
340         if pid:
341             self.connection.execute('sudo kill -9 "%s"' % pid)
342         self.connection.execute('sudo pkill -9 "%s"' % agent)
343         self.connection.execute('sudo service rabbitmq-server stop')
344         self.connection.execute("sudo rabbitmqctl stop_app")