Merge "Doc amendment: env prepare command"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37 from yardstick.network_services.utils import get_nsb_option
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56
57     def __init__(self, mgmt, port_names=None, cores=None, plugins=None,
58                  interval=None, timeout=None):
59
60         if plugins is None:
61             self.plugins = {}
62         else:
63             self.plugins = plugins
64
65         if interval is None:
66             self.interval = self.DEFAULT_INTERVAL
67         else:
68             self.interval = interval
69
70         if timeout is None:
71             self.timeout = self.DEFAULT_TIMEOUT
72         else:
73             self.timeout = timeout
74
75         self.enable = True
76         self._queue = multiprocessing.Queue()
77         self.amqp_client = None
78         self.port_names = validate_non_string_sequence(port_names, default=[])
79
80         # we need to save mgmt so we can connect to port 5672
81         self.mgmt = mgmt
82         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
83
84     def check_if_sa_running(self, process):
85         """ verify if system agent is running """
86         try:
87             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
88             # strip whitespace
89             return err, pid.strip()
90         except OSError as e:
91             if e.errno in {errno.ECONNRESET}:
92                 # if we can't connect to check, then we won't be able to connect to stop it
93                 LOG.exception("can't connect to host to check collectd status")
94                 return 1, None
95             raise
96
97     def run_collectd_amqp(self):
98         """ run amqp consumer to collect the NFVi data """
99         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
100         amqp = AmqpConsumer(amqp_url, self._queue)
101         try:
102             amqp.run()
103         except (AttributeError, RuntimeError, KeyboardInterrupt):
104             amqp.stop()
105
106     @classmethod
107     def parse_simple_resource(cls, key, value):
108         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
109         return {reskey: value.split(":")[1]}
110
111     @classmethod
112     def get_cpu_data(cls, res_key0, res_key1, value):
113         """ Get cpu topology of the host """
114         pattern = r"-(\d+)"
115
116         if 'cpufreq' in res_key0:
117             metric, source = res_key0, res_key1
118         else:
119             metric, source = res_key1, res_key0
120
121         match = re.search(pattern, source, re.MULTILINE)
122         if not match:
123             return "error", "Invalid", "", ""
124
125         time, value = value.split(":")
126         return str(match.group(1)), metric, value, time
127
128     @classmethod
129     def parse_hugepages(cls, key, value):
130         return cls.parse_simple_resource(key, value)
131
132     @classmethod
133     def parse_dpdkstat(cls, key, value):
134         return cls.parse_simple_resource(key, value)
135
136     @classmethod
137     def parse_virt(cls, key, value):
138         return cls.parse_simple_resource(key, value)
139
140     @classmethod
141     def parse_ovs_stats(cls, key, value):
142         return cls.parse_simple_resource(key, value)
143
144     @classmethod
145     def parse_intel_pmu_stats(cls, key, value):
146         return {''.join(str(v) for v in key): value.split(":")[1]}
147
148     def parse_collectd_result(self, metrics):
149         """ convert collectd data into json"""
150         result = {
151             "cpu": {},
152             "memory": {},
153             "hugepages": {},
154             "dpdkstat": {},
155             "virt": {},
156             "ovs_stats": {},
157             "intel_pmu": {},
158         }
159         testcase = ""
160
161         # unicode decode
162         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
163         for key, value in decoded:
164             key_split = key.split("/")
165             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
166             res_key0 = next(res_key_iter)
167             res_key1 = next(res_key_iter)
168
169             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
170                 cpu_key, name, metric, testcase = \
171                     self.get_cpu_data(res_key0, res_key1, value)
172                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
173
174             elif "memory" in res_key0:
175                 result["memory"].update({res_key1: value.split(":")[0]})
176
177             elif "hugepages" in res_key0:
178                 result["hugepages"].update(self.parse_hugepages(key_split, value))
179
180             elif "dpdkstat" in res_key0:
181                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
182
183             elif "virt" in res_key1:
184                 result["virt"].update(self.parse_virt(key_split, value))
185
186             elif "ovs_stats" in res_key0:
187                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
188
189         result["timestamp"] = testcase
190
191         return result
192
193     def amqp_process_for_nfvi_kpi(self):
194         """ amqp collect and return nfvi kpis """
195         if self.amqp_client is None and self.enable:
196             self.amqp_client = multiprocessing.Process(
197                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
198                 target=self.run_collectd_amqp)
199             self.amqp_client.start()
200
201     def amqp_collect_nfvi_kpi(self):
202         """ amqp collect and return nfvi kpis """
203         if not self.enable:
204             return {}
205
206         metric = {}
207         while not self._queue.empty():
208             metric.update(self._queue.get())
209         msg = self.parse_collectd_result(metric)
210         return msg
211
212     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
213         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
214                                                  nfvi_cfg).decode('utf-8')
215         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
216                                       finalize=finalize_for_yaml).render(
217             **template_kwargs)
218         # cfg_content = io.StringIO(template.format(**template_kwargs))
219         cfg_file = os.path.join(config_file_path, nfvi_cfg)
220         # must write as root, so use sudo
221         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
222
223     def _prepare_collectd_conf(self, config_file_path):
224         """ Prepare collectd conf """
225
226         kwargs = {
227             "interval": self.interval,
228             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
229             # Optional fields PortName is descriptive only, use whatever is present
230             "port_names": self.port_names,
231             # "ovs_bridge_interfaces": ["br-int"],
232             "plugins": self.plugins,
233         }
234         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
235
236     def _start_collectd(self, connection, bin_path):
237         LOG.debug("Starting collectd to collect NFVi stats")
238         connection.execute('sudo pkill -x -9 collectd')
239         bin_path = get_nsb_option("bin_path")
240         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
241         config_file_path = os.path.join(bin_path, "collectd", "etc")
242         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
243         if exit_status != 0:
244             LOG.warning("%s is not present disabling", collectd_path)
245             # disable auto-provisioning because it requires Internet access
246             # collectd_installer = os.path.join(bin_path, "collectd.sh")
247             # provision_tool(connection, collectd)
248             # http_proxy = os.environ.get('http_proxy', '')
249             # https_proxy = os.environ.get('https_proxy', '')
250             # connection.execute("sudo %s '%s' '%s'" % (
251             #     collectd_installer, http_proxy, https_proxy))
252             return
253         if "intel_pmu" in self.plugins:
254             LOG.debug("Downloading event list for pmu_stats plugin")
255             cmd = 'sudo bash -c \'cd /opt/tempT/pmu-tools/; python event_download_local.py\''
256             connection.execute(cmd)
257         LOG.debug("Starting collectd to collect NFVi stats")
258         # ensure collectd.conf.d exists to avoid error/warning
259         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
260         self._prepare_collectd_conf(config_file_path)
261
262         # Reset amqp queue
263         LOG.debug("reset and setup amqp to collect data from collectd")
264         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
265         connection.execute("sudo service rabbitmq-server start")
266         connection.execute("sudo rabbitmqctl stop_app")
267         connection.execute("sudo rabbitmqctl reset")
268         connection.execute("sudo rabbitmqctl start_app")
269         connection.execute("sudo service rabbitmq-server restart")
270
271         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
272         connection.execute("sudo rabbitmqctl delete_user guest")
273         connection.execute("sudo rabbitmqctl add_user admin admin")
274         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
275         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
276
277         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
278         # intel_pmu plug requires large numbers of files open, so try to set
279         # ulimit -n to a large value
280         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
281                            timeout=self.timeout)
282         LOG.debug("Done")
283
284     def initiate_systemagent(self, bin_path):
285         """ Start system agent for NFVi collection on host """
286         if self.enable:
287             try:
288                 self._start_collectd(self.connection, bin_path)
289             except Exception:
290                 LOG.exception("Exception during collectd start")
291                 raise
292
293     def start(self):
294         """ start nfvi collection """
295         if self.enable:
296             LOG.debug("Start NVFi metric collection...")
297
298     def stop(self):
299         """ stop nfvi collection """
300         if not self.enable:
301             return
302
303         agent = "collectd"
304         LOG.debug("Stop resource monitor...")
305
306         if self.amqp_client is not None:
307             # we proper and try to join first
308             self.amqp_client.join(3)
309             self.amqp_client.terminate()
310
311         LOG.debug("Check if %s is running", agent)
312         status, pid = self.check_if_sa_running(agent)
313         LOG.debug("status %s  pid %s", status, pid)
314         if status != 0:
315             return
316
317         if pid:
318             self.connection.execute('sudo kill -9 "%s"' % pid)
319         self.connection.execute('sudo pkill -9 "%s"' % agent)
320         self.connection.execute('sudo service rabbitmq-server stop')
321         self.connection.execute("sudo rabbitmqctl stop_app")