Merge "Replace "python-heatclient" with "shade" client"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 import errno
17 from itertools import chain
18 import logging
19 import multiprocessing
20 import os
21 import os.path
22 import re
23
24 import jinja2
25 import pkg_resources
26 from oslo_config import cfg
27 from oslo_utils.encodeutils import safe_decode
28
29 from yardstick import ssh
30 from yardstick.common.task_template import finalize_for_yaml
31 from yardstick.common.utils import validate_non_string_sequence
32 from yardstick.network_services.nfvi.collectd import AmqpConsumer
33
34
35 LOG = logging.getLogger(__name__)
36
37 CONF = cfg.CONF
38 ZMQ_OVS_PORT = 5567
39 ZMQ_POLLING_TIME = 12000
40 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
41                         "hugepages"]
42
43
44 class ResourceProfile(object):
45     """
46     This profile adds a resource at the beginning of the test session
47     """
48     COLLECTD_CONF = "collectd.conf"
49     AMPQ_PORT = 5672
50     DEFAULT_INTERVAL = 25
51     DEFAULT_TIMEOUT = 3600
52     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
53
54     def __init__(self, mgmt, port_names=None, plugins=None, interval=None, timeout=None):
55
56         if plugins is None:
57             self.plugins = {}
58         else:
59             self.plugins = plugins
60
61         if interval is None:
62             self.interval = self.DEFAULT_INTERVAL
63         else:
64             self.interval = interval
65
66         if timeout is None:
67             self.timeout = self.DEFAULT_TIMEOUT
68         else:
69             self.timeout = timeout
70
71         self.enable = True
72         self._queue = multiprocessing.Queue()
73         self.amqp_client = None
74         self.port_names = validate_non_string_sequence(port_names, default=[])
75
76         # we need to save mgmt so we can connect to port 5672
77         self.mgmt = mgmt
78         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
79
80     @classmethod
81     def make_from_node(cls, node, timeout):
82         # node dict works as mgmt dict
83         # don't need port names, there is no way we can
84         # tell what port is used on the compute node
85         collectd_options = node["collectd"]
86         plugins = collectd_options.get("plugins", {})
87         interval = collectd_options.get("interval")
88
89         return cls(node, plugins=plugins, interval=interval, timeout=timeout)
90
91     def check_if_system_agent_running(self, process):
92         """ verify if system agent is running """
93         try:
94             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
95             # strip whitespace
96             return err, pid.strip()
97         except OSError as e:
98             if e.errno in {errno.ECONNRESET}:
99                 # if we can't connect to check, then we won't be able to connect to stop it
100                 LOG.exception("Can't connect to host to check %s status", process)
101                 return 1, None
102             raise
103
104     def run_collectd_amqp(self):
105         """ run amqp consumer to collect the NFVi data """
106         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
107         amqp = AmqpConsumer(amqp_url, self._queue)
108         try:
109             amqp.run()
110         except (AttributeError, RuntimeError, KeyboardInterrupt):
111             amqp.stop()
112
113     @classmethod
114     def parse_simple_resource(cls, key, value):
115         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
116         return {reskey: value.split(":")[1]}
117
118     @classmethod
119     def get_cpu_data(cls, res_key0, res_key1, value):
120         """ Get cpu topology of the host """
121         pattern = r"-(\d+)"
122
123         if 'cpufreq' in res_key0:
124             metric, source = res_key0, res_key1
125         else:
126             metric, source = res_key1, res_key0
127
128         match = re.search(pattern, source, re.MULTILINE)
129         if not match:
130             return "error", "Invalid", "", ""
131
132         time, value = value.split(":")
133         return str(match.group(1)), metric, value, time
134
135     @classmethod
136     def parse_hugepages(cls, key, value):
137         return cls.parse_simple_resource(key, value)
138
139     @classmethod
140     def parse_dpdkstat(cls, key, value):
141         return cls.parse_simple_resource(key, value)
142
143     @classmethod
144     def parse_virt(cls, key, value):
145         return cls.parse_simple_resource(key, value)
146
147     @classmethod
148     def parse_ovs_stats(cls, key, value):
149         return cls.parse_simple_resource(key, value)
150
151     @classmethod
152     def parse_intel_pmu_stats(cls, key, value):
153         return {''.join(str(v) for v in key): value.split(":")[1]}
154
155     def parse_collectd_result(self, metrics):
156         """ convert collectd data into json"""
157         result = {
158             "cpu": {},
159             "memory": {},
160             "hugepages": {},
161             "dpdkstat": {},
162             "virt": {},
163             "ovs_stats": {},
164         }
165         testcase = ""
166
167         # unicode decode
168         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
169         for key, value in decoded:
170             key_split = key.split("/")
171             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
172             res_key0 = next(res_key_iter)
173             res_key1 = next(res_key_iter)
174
175             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
176                 cpu_key, name, metric, testcase = \
177                     self.get_cpu_data(res_key0, res_key1, value)
178                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
179
180             elif "memory" in res_key0:
181                 result["memory"].update({res_key1: value.split(":")[0]})
182
183             elif "hugepages" in res_key0:
184                 result["hugepages"].update(self.parse_hugepages(key_split, value))
185
186             elif "dpdkstat" in res_key0:
187                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
188
189             elif "virt" in res_key1:
190                 result["virt"].update(self.parse_virt(key_split, value))
191
192             elif "ovs_stats" in res_key0:
193                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
194
195         result["timestamp"] = testcase
196
197         return result
198
199     def amqp_process_for_nfvi_kpi(self):
200         """ amqp collect and return nfvi kpis """
201         if self.amqp_client is None and self.enable:
202             self.amqp_client = multiprocessing.Process(
203                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
204                 target=self.run_collectd_amqp)
205             self.amqp_client.start()
206
207     def amqp_collect_nfvi_kpi(self):
208         """ amqp collect and return nfvi kpis """
209         if not self.enable:
210             return {}
211
212         metric = {}
213         while not self._queue.empty():
214             metric.update(self._queue.get())
215         msg = self.parse_collectd_result(metric)
216         return msg
217
218     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
219         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
220                                                  nfvi_cfg).decode('utf-8')
221         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
222                                       finalize=finalize_for_yaml).render(
223             **template_kwargs)
224         # cfg_content = io.StringIO(template.format(**template_kwargs))
225         cfg_file = os.path.join(config_file_path, nfvi_cfg)
226         # must write as root, so use sudo
227         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
228
229     def _prepare_collectd_conf(self, config_file_path):
230         """ Prepare collectd conf """
231
232         kwargs = {
233             "interval": self.interval,
234             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
235             # Optional fields PortName is descriptive only, use whatever is present
236             "port_names": self.port_names,
237             # "ovs_bridge_interfaces": ["br-int"],
238             "plugins": self.plugins,
239         }
240         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
241
242     def _setup_ovs_stats(self, connection):
243         try:
244             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
245         except KeyError:
246             # ovs_stats is not a dict
247             socket_path = self.OVS_SOCKET_PATH
248         status = connection.execute("test -S {}".format(socket_path))[0]
249         if status != 0:
250             LOG.error("cannot find OVS socket %s", socket_path)
251
252     def _start_collectd(self, connection, bin_path):
253         LOG.debug("Starting collectd to collect NFVi stats")
254         connection.execute('sudo pkill -x -9 collectd')
255         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
256         config_file_path = os.path.join(bin_path, "collectd", "etc")
257         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
258         if exit_status != 0:
259             LOG.warning("%s is not present disabling", collectd_path)
260             # disable auto-provisioning because it requires Internet access
261             # collectd_installer = os.path.join(bin_path, "collectd.sh")
262             # provision_tool(connection, collectd)
263             # http_proxy = os.environ.get('http_proxy', '')
264             # https_proxy = os.environ.get('https_proxy', '')
265             # connection.execute("sudo %s '%s' '%s'" % (
266             #     collectd_installer, http_proxy, https_proxy))
267             return
268         if "ovs_stats" in self.plugins:
269             self._setup_ovs_stats(connection)
270
271         LOG.debug("Starting collectd to collect NFVi stats")
272         # ensure collectd.conf.d exists to avoid error/warning
273         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
274         self._prepare_collectd_conf(config_file_path)
275
276         # Reset amqp queue
277         LOG.debug("reset and setup amqp to collect data from collectd")
278         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
279         connection.execute("sudo service rabbitmq-server start")
280         connection.execute("sudo rabbitmqctl stop_app")
281         connection.execute("sudo rabbitmqctl reset")
282         connection.execute("sudo rabbitmqctl start_app")
283         connection.execute("sudo service rabbitmq-server restart")
284
285         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
286         connection.execute("sudo rabbitmqctl delete_user guest")
287         connection.execute("sudo rabbitmqctl add_user admin admin")
288         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
289         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
290
291         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
292         # intel_pmu plug requires large numbers of files open, so try to set
293         # ulimit -n to a large value
294         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
295                            timeout=self.timeout)
296         LOG.debug("Done")
297
298     def initiate_systemagent(self, bin_path):
299         """ Start system agent for NFVi collection on host """
300         if self.enable:
301             try:
302                 self._start_collectd(self.connection, bin_path)
303             except Exception:
304                 LOG.exception("Exception during collectd start")
305                 raise
306
307     def start(self):
308         """ start nfvi collection """
309         if self.enable:
310             LOG.debug("Start NVFi metric collection...")
311
312     def stop(self):
313         """ stop nfvi collection """
314         if not self.enable:
315             return
316
317         agent = "collectd"
318         LOG.debug("Stop resource monitor...")
319
320         if self.amqp_client is not None:
321             # we proper and try to join first
322             self.amqp_client.join(3)
323             self.amqp_client.terminate()
324
325         LOG.debug("Check if %s is running", agent)
326         status, pid = self.check_if_system_agent_running(agent)
327         LOG.debug("status %s  pid %s", status, pid)
328         if status != 0:
329             return
330
331         if pid:
332             self.connection.execute('sudo kill -9 "%s"' % pid)
333         self.connection.execute('sudo pkill -9 "%s"' % agent)
334         self.connection.execute('sudo service rabbitmq-server stop')
335         self.connection.execute("sudo rabbitmqctl stop_app")