Merge "Adding scale out templates for ovs_dpdk/sriov using 2 node setup"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19 import logging
20 from itertools import chain
21
22 import errno
23 import jinja2
24 import os
25 import os.path
26 import re
27 import multiprocessing
28 import pkg_resources
29
30 from oslo_config import cfg
31 from oslo_utils.encodeutils import safe_decode
32
33 from yardstick import ssh
34 from yardstick.common.task_template import finalize_for_yaml
35 from yardstick.common.utils import validate_non_string_sequence
36 from yardstick.network_services.nfvi.collectd import AmqpConsumer
37 from yardstick.network_services.utils import get_nsb_option
38
39 LOG = logging.getLogger(__name__)
40
41 CONF = cfg.CONF
42 ZMQ_OVS_PORT = 5567
43 ZMQ_POLLING_TIME = 12000
44 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
45                         "hugepages"]
46
47
48 class ResourceProfile(object):
49     """
50     This profile adds a resource at the beginning of the test session
51     """
52     COLLECTD_CONF = "collectd.conf"
53     AMPQ_PORT = 5672
54     DEFAULT_INTERVAL = 25
55     DEFAULT_TIMEOUT = 3600
56
57     def __init__(self, mgmt, port_names=None, cores=None, plugins=None,
58                  interval=None, timeout=None):
59
60         if plugins is None:
61             self.plugins = {}
62         else:
63             self.plugins = plugins
64
65         if interval is None:
66             self.interval = self.DEFAULT_INTERVAL
67         else:
68             self.interval = interval
69
70         if timeout is None:
71             self.timeout = self.DEFAULT_TIMEOUT
72         else:
73             self.timeout = timeout
74
75         self.enable = True
76         self._queue = multiprocessing.Queue()
77         self.amqp_client = None
78         self.port_names = validate_non_string_sequence(port_names, default=[])
79
80         # we need to save mgmt so we can connect to port 5672
81         self.mgmt = mgmt
82         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
83
84     def check_if_sa_running(self, process):
85         """ verify if system agent is running """
86         try:
87             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
88             # strip whitespace
89             return err, pid.strip()
90         except OSError as e:
91             if e.errno in {errno.ECONNRESET}:
92                 # if we can't connect to check, then we won't be able to connect to stop it
93                 LOG.exception("can't connect to host to check collectd status")
94                 return 1, None
95             raise
96
97     def run_collectd_amqp(self):
98         """ run amqp consumer to collect the NFVi data """
99         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
100         amqp = AmqpConsumer(amqp_url, self._queue)
101         try:
102             amqp.run()
103         except (AttributeError, RuntimeError, KeyboardInterrupt):
104             amqp.stop()
105
106     @classmethod
107     def parse_simple_resource(cls, key, value):
108         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
109         return {reskey: value.split(":")[1]}
110
111     @classmethod
112     def get_cpu_data(cls, res_key0, res_key1, value):
113         """ Get cpu topology of the host """
114         pattern = r"-(\d+)"
115
116         if 'cpufreq' in res_key0:
117             metric, source = res_key0, res_key1
118         else:
119             metric, source = res_key1, res_key0
120
121         match = re.search(pattern, source, re.MULTILINE)
122         if not match:
123             return "error", "Invalid", "", ""
124
125         time, value = value.split(":")
126         return str(match.group(1)), metric, value, time
127
128     @classmethod
129     def parse_hugepages(cls, key, value):
130         return cls.parse_simple_resource(key, value)
131
132     @classmethod
133     def parse_dpdkstat(cls, key, value):
134         return cls.parse_simple_resource(key, value)
135
136     @classmethod
137     def parse_virt(cls, key, value):
138         return cls.parse_simple_resource(key, value)
139
140     @classmethod
141     def parse_ovs_stats(cls, key, value):
142         return cls.parse_simple_resource(key, value)
143
144     @classmethod
145     def parse_intel_pmu_stats(cls, key, value):
146         return {''.join(str(v) for v in key): value.split(":")[1]}
147
148     def parse_collectd_result(self, metrics):
149         """ convert collectd data into json"""
150         result = {
151             "cpu": {},
152             "memory": {},
153             "hugepages": {},
154             "dpdkstat": {},
155             "virt": {},
156             "ovs_stats": {},
157             "intel_pmu": {},
158         }
159         testcase = ""
160
161         # unicode decode
162         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
163         for key, value in decoded:
164             key_split = key.split("/")
165             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
166             res_key0 = next(res_key_iter)
167             res_key1 = next(res_key_iter)
168
169             if "cpu" in res_key0 or "intel_rdt" in res_key0:
170                 cpu_key, name, metric, testcase = \
171                     self.get_cpu_data(res_key0, res_key1, value)
172                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
173
174             elif "memory" in res_key0:
175                 result["memory"].update({res_key1: value.split(":")[0]})
176
177             elif "hugepages" in res_key0:
178                 result["hugepages"].update(self.parse_hugepages(key_split, value))
179
180             elif "dpdkstat" in res_key0:
181                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
182
183             elif "virt" in res_key1:
184                 result["virt"].update(self.parse_virt(key_split, value))
185
186             elif "ovs_stats" in res_key0:
187                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
188
189             elif "intel_pmu-all" in res_key0:
190                 result["intel_pmu"].update(self.parse_intel_pmu_stats(res_key1, value))
191
192         result["timestamp"] = testcase
193
194         return result
195
196     def amqp_process_for_nfvi_kpi(self):
197         """ amqp collect and return nfvi kpis """
198         if self.amqp_client is None and self.enable:
199             self.amqp_client = multiprocessing.Process(
200                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
201                 target=self.run_collectd_amqp)
202             self.amqp_client.start()
203
204     def amqp_collect_nfvi_kpi(self):
205         """ amqp collect and return nfvi kpis """
206         if not self.enable:
207             return {}
208
209         metric = {}
210         while not self._queue.empty():
211             metric.update(self._queue.get())
212         msg = self.parse_collectd_result(metric)
213         return msg
214
215     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
216         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
217                                                  nfvi_cfg).decode('utf-8')
218         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
219                                       finalize=finalize_for_yaml).render(
220             **template_kwargs)
221         # cfg_content = io.StringIO(template.format(**template_kwargs))
222         cfg_file = os.path.join(config_file_path, nfvi_cfg)
223         # must write as root, so use sudo
224         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
225
226     def _prepare_collectd_conf(self, config_file_path):
227         """ Prepare collectd conf """
228
229         kwargs = {
230             "interval": self.interval,
231             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
232             # Optional fields PortName is descriptive only, use whatever is present
233             "port_names": self.port_names,
234             # "ovs_bridge_interfaces": ["br-int"],
235             "plugins": self.plugins,
236         }
237         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
238
239     def _start_collectd(self, connection, bin_path):
240         LOG.debug("Starting collectd to collect NFVi stats")
241         connection.execute('sudo pkill -x -9 collectd')
242         bin_path = get_nsb_option("bin_path")
243         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
244         config_file_path = os.path.join(bin_path, "collectd", "etc")
245         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
246         if exit_status != 0:
247             LOG.warning("%s is not present disabling", collectd_path)
248             # disable auto-provisioning because it requires Internet access
249             # collectd_installer = os.path.join(bin_path, "collectd.sh")
250             # provision_tool(connection, collectd)
251             # http_proxy = os.environ.get('http_proxy', '')
252             # https_proxy = os.environ.get('https_proxy', '')
253             # connection.execute("sudo %s '%s' '%s'" % (
254             #     collectd_installer, http_proxy, https_proxy))
255             return
256         LOG.debug("Starting collectd to collect NFVi stats")
257         # ensure collectd.conf.d exists to avoid error/warning
258         connection.execute("sudo mkdir -p /etc/collectd/collectd.conf.d")
259         self._prepare_collectd_conf(config_file_path)
260
261         # Reset amqp queue
262         LOG.debug("reset and setup amqp to collect data from collectd")
263         connection.execute("sudo rm -rf /var/lib/rabbitmq/mnesia/rabbit*")
264         connection.execute("sudo service rabbitmq-server start")
265         connection.execute("sudo rabbitmqctl stop_app")
266         connection.execute("sudo rabbitmqctl reset")
267         connection.execute("sudo rabbitmqctl start_app")
268         connection.execute("sudo service rabbitmq-server restart")
269
270         LOG.debug("Creating admin user for rabbitmq in order to collect data from collectd")
271         connection.execute("sudo rabbitmqctl delete_user guest")
272         connection.execute("sudo rabbitmqctl add_user admin admin")
273         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
274         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
275
276         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
277         connection.execute("sudo %s" % collectd_path, timeout=self.timeout)
278         LOG.debug("Done")
279
280     def initiate_systemagent(self, bin_path):
281         """ Start system agent for NFVi collection on host """
282         if self.enable:
283             try:
284                 self._start_collectd(self.connection, bin_path)
285             except Exception:
286                 LOG.exception("Exception during collectd start")
287                 raise
288
289     def start(self):
290         """ start nfvi collection """
291         if self.enable:
292             LOG.debug("Start NVFi metric collection...")
293
294     def stop(self):
295         """ stop nfvi collection """
296         if not self.enable:
297             return
298
299         agent = "collectd"
300         LOG.debug("Stop resource monitor...")
301
302         if self.amqp_client is not None:
303             # we proper and try to join first
304             self.amqp_client.join(3)
305             self.amqp_client.terminate()
306
307         LOG.debug("Check if %s is running", agent)
308         status, pid = self.check_if_sa_running(agent)
309         LOG.debug("status %s  pid %s", status, pid)
310         if status != 0:
311             return
312
313         if pid:
314             self.connection.execute('sudo kill -9 "%s"' % pid)
315         self.connection.execute('sudo pkill -9 "%s"' % agent)
316         self.connection.execute('sudo service rabbitmq-server stop')
317         self.connection.execute("sudo rabbitmqctl stop_app")