update docker version to 16.04
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
57
58     def __init__(self, mgmt, port_names=None, plugins=None, interval=None, timeout=None):
59
60         if plugins is None:
61             self.plugins = {}
62         else:
63             self.plugins = plugins
64
65         if interval is None:
66             self.interval = self.DEFAULT_INTERVAL
67         else:
68             self.interval = interval
69
70         if timeout is None:
71             self.timeout = self.DEFAULT_TIMEOUT
72         else:
73             self.timeout = timeout
74
75         self.enable = True
76         self._queue = multiprocessing.Queue()
77         self.amqp_client = None
78         self.port_names = validate_non_string_sequence(port_names, default=[])
79
80         # we need to save mgmt so we can connect to port 5672
81         self.mgmt = mgmt
82         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
83
84     @classmethod
85     def make_from_node(cls, node, timeout):
86         # node dict works as mgmt dict
87         # don't need port names, there is no way we can
88         # tell what port is used on the compute node
89         collectd_options = node["collectd"]
90         plugins = collectd_options.get("plugins", {})
91         interval = collectd_options.get("interval")
92
93         return cls(node, plugins=plugins, interval=interval, timeout=timeout)
94
95     def check_if_sa_running(self, process):
96         """ verify if system agent is running """
97         try:
98             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
99             # strip whitespace
100             return err, pid.strip()
101         except OSError as e:
102             if e.errno in {errno.ECONNRESET}:
103                 # if we can't connect to check, then we won't be able to connect to stop it
104                 LOG.exception("can't connect to host to check collectd status")
105                 return 1, None
106             raise
107
108     def run_collectd_amqp(self):
109         """ run amqp consumer to collect the NFVi data """
110         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
111         amqp = AmqpConsumer(amqp_url, self._queue)
112         try:
113             amqp.run()
114         except (AttributeError, RuntimeError, KeyboardInterrupt):
115             amqp.stop()
116
117     @classmethod
118     def parse_simple_resource(cls, key, value):
119         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
120         return {reskey: value.split(":")[1]}
121
122     @classmethod
123     def get_cpu_data(cls, res_key0, res_key1, value):
124         """ Get cpu topology of the host """
125         pattern = r"-(\d+)"
126
127         if 'cpufreq' in res_key0:
128             metric, source = res_key0, res_key1
129         else:
130             metric, source = res_key1, res_key0
131
132         match = re.search(pattern, source, re.MULTILINE)
133         if not match:
134             return "error", "Invalid", "", ""
135
136         time, value = value.split(":")
137         return str(match.group(1)), metric, value, time
138
139     @classmethod
140     def parse_hugepages(cls, key, value):
141         return cls.parse_simple_resource(key, value)
142
143     @classmethod
144     def parse_dpdkstat(cls, key, value):
145         return cls.parse_simple_resource(key, value)
146
147     @classmethod
148     def parse_virt(cls, key, value):
149         return cls.parse_simple_resource(key, value)
150
151     @classmethod
152     def parse_ovs_stats(cls, key, value):
153         return cls.parse_simple_resource(key, value)
154
155     @classmethod
156     def parse_intel_pmu_stats(cls, key, value):
157         return {''.join(str(v) for v in key): value.split(":")[1]}
158
159     def parse_collectd_result(self, metrics):
160         """ convert collectd data into json"""
161         result = {
162             "cpu": {},
163             "memory": {},
164             "hugepages": {},
165             "dpdkstat": {},
166             "virt": {},
167             "ovs_stats": {},
168         }
169         testcase = ""
170
171         # unicode decode
172         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
173         for key, value in decoded:
174             key_split = key.split("/")
175             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
176             res_key0 = next(res_key_iter)
177             res_key1 = next(res_key_iter)
178
179             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
180                 cpu_key, name, metric, testcase = \
181                     self.get_cpu_data(res_key0, res_key1, value)
182                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
183
184             elif "memory" in res_key0:
185                 result["memory"].update({res_key1: value.split(":")[0]})
186
187             elif "hugepages" in res_key0:
188                 result["hugepages"].update(self.parse_hugepages(key_split, value))
189
190             elif "dpdkstat" in res_key0:
191                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
192
193             elif "virt" in res_key1:
194                 result["virt"].update(self.parse_virt(key_split, value))
195
196             elif "ovs_stats" in res_key0:
197                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
198
199         result["timestamp"] = testcase
200
201         return result
202
203     def amqp_process_for_nfvi_kpi(self):
204         """ amqp collect and return nfvi kpis """
205         if self.amqp_client is None and self.enable:
206             self.amqp_client = multiprocessing.Process(
207                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
208                 target=self.run_collectd_amqp)
209             self.amqp_client.start()
210
211     def amqp_collect_nfvi_kpi(self):
212         """ amqp collect and return nfvi kpis """
213         if not self.enable:
214             return {}
215
216         metric = {}
217         while not self._queue.empty():
218             metric.update(self._queue.get())
219         msg = self.parse_collectd_result(metric)
220         return msg
221
222     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
223         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
224                                                  nfvi_cfg).decode('utf-8')
225         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
226                                       finalize=finalize_for_yaml).render(
227             **template_kwargs)
228         # cfg_content = io.StringIO(template.format(**template_kwargs))
229         cfg_file = os.path.join(config_file_path, nfvi_cfg)
230         # must write as root, so use sudo
231         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
232
233     def _prepare_collectd_conf(self, config_file_path):
234         """ Prepare collectd conf """
235
236         kwargs = {
237             "interval": self.interval,
238             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
239             # Optional fields PortName is descriptive only, use whatever is present
240             "port_names": self.port_names,
241             # "ovs_bridge_interfaces": ["br-int"],
242             "plugins": self.plugins,
243         }
244         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
245
246     def _setup_intel_pmu(self, connection, bin_path):
247         pmu_event_path = os.path.join(bin_path, "pmu_event.json")
248         try:
249             self.plugins["intel_pmu"]["pmu_event_path"] = pmu_event_path
250         except KeyError:
251             # if intel_pmu is not a dict, force it into a dict
252             self.plugins["intel_pmu"] = {"pmu_event_path": pmu_event_path}
253         LOG.debug("Downloading event list for pmu_stats plugin")
254         cmd = 'cd {0}; PMU_EVENTS_PATH={1} python event_download_local.py'.format(
255             bin_path, pmu_event_path)
256         cmd = "sudo bash -c '{}'".format(cmd)
257         connection.execute(cmd)
258
259     def _setup_ovs_stats(self, connection):
260         try:
261             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
262         except KeyError:
263             # ovs_stats is not a dict
264             socket_path = self.OVS_SOCKET_PATH
265         status = connection.execute("test -S {}".format(socket_path))[0]
266         if status != 0:
267             LOG.error("cannot find OVS socket %s", socket_path)
268
269     def _start_collectd(self, connection, bin_path):
270         LOG.debug("Starting collectd to collect NFVi stats")
271         connection.execute('sudo pkill -x -9 collectd')
272         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
273         config_file_path = os.path.join(bin_path, "collectd", "etc")
274         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
275         if exit_status != 0:
276             LOG.warning("%s is not present disabling", collectd_path)
277             # disable auto-provisioning because it requires Internet access
278             # collectd_installer = os.path.join(bin_path, "collectd.sh")
279             # provision_tool(connection, collectd)
280             # http_proxy = os.environ.get('http_proxy', '')
281             # https_proxy = os.environ.get('https_proxy', '')
282             # connection.execute("sudo %s '%s' '%s'" % (
283             #     collectd_installer, http_proxy, https_proxy))
284             return
285         if "intel_pmu" in self.plugins:
286             self._setup_intel_pmu(connection, bin_path)
287         if "ovs_stats" in self.plugins:
288             self._setup_ovs_stats(connection)
289
290         LOG.debug("Starting collectd to collect NFVi stats")
291         # ensure collectd.conf.d exists to avoid error/warning
292         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
293         self._prepare_collectd_conf(config_file_path)
294
295         # Reset amqp queue
296         LOG.debug("reset and setup amqp to collect data from collectd")
297         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
298         connection.execute("sudo service rabbitmq-server start")
299         connection.execute("sudo rabbitmqctl stop_app")
300         connection.execute("sudo rabbitmqctl reset")
301         connection.execute("sudo rabbitmqctl start_app")
302         connection.execute("sudo service rabbitmq-server restart")
303
304         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
305         connection.execute("sudo rabbitmqctl delete_user guest")
306         connection.execute("sudo rabbitmqctl add_user admin admin")
307         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
308         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
309
310         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
311         # intel_pmu plug requires large numbers of files open, so try to set
312         # ulimit -n to a large value
313         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
314                            timeout=self.timeout)
315         LOG.debug("Done")
316
317     def initiate_systemagent(self, bin_path):
318         """ Start system agent for NFVi collection on host """
319         if self.enable:
320             try:
321                 self._start_collectd(self.connection, bin_path)
322             except Exception:
323                 LOG.exception("Exception during collectd start")
324                 raise
325
326     def start(self):
327         """ start nfvi collection """
328         if self.enable:
329             LOG.debug("Start NVFi metric collection...")
330
331     def stop(self):
332         """ stop nfvi collection """
333         if not self.enable:
334             return
335
336         agent = "collectd"
337         LOG.debug("Stop resource monitor...")
338
339         if self.amqp_client is not None:
340             # we proper and try to join first
341             self.amqp_client.join(3)
342             self.amqp_client.terminate()
343
344         LOG.debug("Check if %s is running", agent)
345         status, pid = self.check_if_sa_running(agent)
346         LOG.debug("status %s  pid %s", status, pid)
347         if status != 0:
348             return
349
350         if pid:
351             self.connection.execute('sudo kill -9 "%s"' % pid)
352         self.connection.execute('sudo pkill -9 "%s"' % agent)
353         self.connection.execute('sudo service rabbitmq-server stop')
354         self.connection.execute("sudo rabbitmqctl stop_app")