Merge "Remove network_services.vnf_generic.vnf.prox_helpers.ProxSocketHelper.rx_stats"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
57
58     def __init__(self, mgmt, port_names=None, plugins=None, interval=None, timeout=None):
59
60         if plugins is None:
61             self.plugins = {}
62         else:
63             self.plugins = plugins
64
65         if interval is None:
66             self.interval = self.DEFAULT_INTERVAL
67         else:
68             self.interval = interval
69
70         if timeout is None:
71             self.timeout = self.DEFAULT_TIMEOUT
72         else:
73             self.timeout = timeout
74
75         self.enable = True
76         self._queue = multiprocessing.Queue()
77         self.amqp_client = None
78         self.port_names = validate_non_string_sequence(port_names, default=[])
79
80         # we need to save mgmt so we can connect to port 5672
81         self.mgmt = mgmt
82         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
83
84     @classmethod
85     def make_from_node(cls, node, timeout):
86         # node dict works as mgmt dict
87         # don't need port names, there is no way we can
88         # tell what port is used on the compute node
89         collectd_options = node["collectd"]
90         plugins = collectd_options.get("plugins", {})
91         interval = collectd_options.get("interval")
92
93         return cls(node, plugins=plugins, interval=interval, timeout=timeout)
94
95     def check_if_sa_running(self, process):
96         """ verify if system agent is running """
97         try:
98             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
99             # strip whitespace
100             return err, pid.strip()
101         except OSError as e:
102             if e.errno in {errno.ECONNRESET}:
103                 # if we can't connect to check, then we won't be able to connect to stop it
104                 LOG.exception("can't connect to host to check collectd status")
105                 return 1, None
106             raise
107
108     def run_collectd_amqp(self):
109         """ run amqp consumer to collect the NFVi data """
110         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
111         amqp = AmqpConsumer(amqp_url, self._queue)
112         try:
113             amqp.run()
114         except (AttributeError, RuntimeError, KeyboardInterrupt):
115             amqp.stop()
116
117     @classmethod
118     def parse_simple_resource(cls, key, value):
119         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
120         return {reskey: value.split(":")[1]}
121
122     @classmethod
123     def get_cpu_data(cls, res_key0, res_key1, value):
124         """ Get cpu topology of the host """
125         pattern = r"-(\d+)"
126
127         if 'cpufreq' in res_key0:
128             metric, source = res_key0, res_key1
129         else:
130             metric, source = res_key1, res_key0
131
132         match = re.search(pattern, source, re.MULTILINE)
133         if not match:
134             return "error", "Invalid", "", ""
135
136         time, value = value.split(":")
137         return str(match.group(1)), metric, value, time
138
139     @classmethod
140     def parse_hugepages(cls, key, value):
141         return cls.parse_simple_resource(key, value)
142
143     @classmethod
144     def parse_dpdkstat(cls, key, value):
145         return cls.parse_simple_resource(key, value)
146
147     @classmethod
148     def parse_virt(cls, key, value):
149         return cls.parse_simple_resource(key, value)
150
151     @classmethod
152     def parse_ovs_stats(cls, key, value):
153         return cls.parse_simple_resource(key, value)
154
155     @classmethod
156     def parse_intel_pmu_stats(cls, key, value):
157         return {''.join(str(v) for v in key): value.split(":")[1]}
158
159     def parse_collectd_result(self, metrics):
160         """ convert collectd data into json"""
161         result = {
162             "cpu": {},
163             "memory": {},
164             "hugepages": {},
165             "dpdkstat": {},
166             "virt": {},
167             "ovs_stats": {},
168         }
169         testcase = ""
170
171         # unicode decode
172         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
173         for key, value in decoded:
174             key_split = key.split("/")
175             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
176             res_key0 = next(res_key_iter)
177             res_key1 = next(res_key_iter)
178
179             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
180                 cpu_key, name, metric, testcase = \
181                     self.get_cpu_data(res_key0, res_key1, value)
182                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
183
184             elif "memory" in res_key0:
185                 result["memory"].update({res_key1: value.split(":")[0]})
186
187             elif "hugepages" in res_key0:
188                 result["hugepages"].update(self.parse_hugepages(key_split, value))
189
190             elif "dpdkstat" in res_key0:
191                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
192
193             elif "virt" in res_key1:
194                 result["virt"].update(self.parse_virt(key_split, value))
195
196             elif "ovs_stats" in res_key0:
197                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
198
199         result["timestamp"] = testcase
200
201         return result
202
203     def amqp_process_for_nfvi_kpi(self):
204         """ amqp collect and return nfvi kpis """
205         if self.amqp_client is None and self.enable:
206             self.amqp_client = multiprocessing.Process(
207                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
208                 target=self.run_collectd_amqp)
209             self.amqp_client.start()
210
211     def amqp_collect_nfvi_kpi(self):
212         """ amqp collect and return nfvi kpis """
213         if not self.enable:
214             return {}
215
216         metric = {}
217         while not self._queue.empty():
218             metric.update(self._queue.get())
219         msg = self.parse_collectd_result(metric)
220         return msg
221
222     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
223         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
224                                                  nfvi_cfg).decode('utf-8')
225         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
226                                       finalize=finalize_for_yaml).render(
227             **template_kwargs)
228         # cfg_content = io.StringIO(template.format(**template_kwargs))
229         cfg_file = os.path.join(config_file_path, nfvi_cfg)
230         # must write as root, so use sudo
231         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
232
233     def _prepare_collectd_conf(self, config_file_path):
234         """ Prepare collectd conf """
235
236         kwargs = {
237             "interval": self.interval,
238             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
239             # Optional fields PortName is descriptive only, use whatever is present
240             "port_names": self.port_names,
241             # "ovs_bridge_interfaces": ["br-int"],
242             "plugins": self.plugins,
243         }
244         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
245
246     def _setup_ovs_stats(self, connection):
247         try:
248             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
249         except KeyError:
250             # ovs_stats is not a dict
251             socket_path = self.OVS_SOCKET_PATH
252         status = connection.execute("test -S {}".format(socket_path))[0]
253         if status != 0:
254             LOG.error("cannot find OVS socket %s", socket_path)
255
256     def _start_collectd(self, connection, bin_path):
257         LOG.debug("Starting collectd to collect NFVi stats")
258         connection.execute('sudo pkill -x -9 collectd')
259         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
260         config_file_path = os.path.join(bin_path, "collectd", "etc")
261         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
262         if exit_status != 0:
263             LOG.warning("%s is not present disabling", collectd_path)
264             # disable auto-provisioning because it requires Internet access
265             # collectd_installer = os.path.join(bin_path, "collectd.sh")
266             # provision_tool(connection, collectd)
267             # http_proxy = os.environ.get('http_proxy', '')
268             # https_proxy = os.environ.get('https_proxy', '')
269             # connection.execute("sudo %s '%s' '%s'" % (
270             #     collectd_installer, http_proxy, https_proxy))
271             return
272         if "ovs_stats" in self.plugins:
273             self._setup_ovs_stats(connection)
274
275         LOG.debug("Starting collectd to collect NFVi stats")
276         # ensure collectd.conf.d exists to avoid error/warning
277         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
278         self._prepare_collectd_conf(config_file_path)
279
280         # Reset amqp queue
281         LOG.debug("reset and setup amqp to collect data from collectd")
282         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
283         connection.execute("sudo service rabbitmq-server start")
284         connection.execute("sudo rabbitmqctl stop_app")
285         connection.execute("sudo rabbitmqctl reset")
286         connection.execute("sudo rabbitmqctl start_app")
287         connection.execute("sudo service rabbitmq-server restart")
288
289         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
290         connection.execute("sudo rabbitmqctl delete_user guest")
291         connection.execute("sudo rabbitmqctl add_user admin admin")
292         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
293         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
294
295         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
296         # intel_pmu plug requires large numbers of files open, so try to set
297         # ulimit -n to a large value
298         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
299                            timeout=self.timeout)
300         LOG.debug("Done")
301
302     def initiate_systemagent(self, bin_path):
303         """ Start system agent for NFVi collection on host """
304         if self.enable:
305             try:
306                 self._start_collectd(self.connection, bin_path)
307             except Exception:
308                 LOG.exception("Exception during collectd start")
309                 raise
310
311     def start(self):
312         """ start nfvi collection """
313         if self.enable:
314             LOG.debug("Start NVFi metric collection...")
315
316     def stop(self):
317         """ stop nfvi collection """
318         if not self.enable:
319             return
320
321         agent = "collectd"
322         LOG.debug("Stop resource monitor...")
323
324         if self.amqp_client is not None:
325             # we proper and try to join first
326             self.amqp_client.join(3)
327             self.amqp_client.terminate()
328
329         LOG.debug("Check if %s is running", agent)
330         status, pid = self.check_if_sa_running(agent)
331         LOG.debug("status %s  pid %s", status, pid)
332         if status != 0:
333             return
334
335         if pid:
336             self.connection.execute('sudo kill -9 "%s"' % pid)
337         self.connection.execute('sudo pkill -9 "%s"' % agent)
338         self.connection.execute('sudo service rabbitmq-server stop')
339         self.connection.execute("sudo rabbitmqctl stop_app")