Merge "Test case spec for SDN Virtual Switch resilience."
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 import errno
17 from itertools import chain
18 import logging
19 import multiprocessing
20 import os
21 import os.path
22 import re
23
24 import jinja2
25 import pkg_resources
26 from oslo_config import cfg
27 from oslo_utils.encodeutils import safe_decode
28
29 from yardstick import ssh
30 from yardstick.common.exceptions import ResourceCommandError
31 from yardstick.common.task_template import finalize_for_yaml
32 from yardstick.common.utils import validate_non_string_sequence
33 from yardstick.network_services.nfvi.collectd import AmqpConsumer
34
35
36 LOG = logging.getLogger(__name__)
37
38 CONF = cfg.CONF
39 ZMQ_OVS_PORT = 5567
40 ZMQ_POLLING_TIME = 12000
41 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "memory",
42                         "hugepages"]
43
44
45 class ResourceProfile(object):
46     """
47     This profile adds a resource at the beginning of the test session
48     """
49     COLLECTD_CONF = "collectd.conf"
50     AMPQ_PORT = 5672
51     DEFAULT_INTERVAL = 25
52     DEFAULT_TIMEOUT = 3600
53     OVS_SOCKET_PATH = "/usr/local/var/run/openvswitch/db.sock"
54
55     def __init__(self, mgmt, port_names=None, plugins=None, interval=None, timeout=None):
56
57         if plugins is None:
58             self.plugins = {}
59         else:
60             self.plugins = plugins
61
62         if interval is None:
63             self.interval = self.DEFAULT_INTERVAL
64         else:
65             self.interval = interval
66
67         if timeout is None:
68             self.timeout = self.DEFAULT_TIMEOUT
69         else:
70             self.timeout = timeout
71
72         self.enable = True
73         self._queue = multiprocessing.Queue()
74         self.amqp_client = None
75         self.port_names = validate_non_string_sequence(port_names, default=[])
76
77         # we need to save mgmt so we can connect to port 5672
78         self.mgmt = mgmt
79         self.connection = ssh.AutoConnectSSH.from_node(mgmt)
80
81     @classmethod
82     def make_from_node(cls, node, timeout):
83         # node dict works as mgmt dict
84         # don't need port names, there is no way we can
85         # tell what port is used on the compute node
86         collectd_options = node["collectd"]
87         plugins = collectd_options.get("plugins", {})
88         interval = collectd_options.get("interval")
89
90         return cls(node, plugins=plugins, interval=interval, timeout=timeout)
91
92     def check_if_system_agent_running(self, process):
93         """ verify if system agent is running """
94         try:
95             err, pid, _ = self.connection.execute("pgrep -f %s" % process)
96             # strip whitespace
97             return err, pid.strip()
98         except OSError as e:
99             if e.errno in {errno.ECONNRESET}:
100                 # if we can't connect to check, then we won't be able to connect to stop it
101                 LOG.exception("Can't connect to host to check %s status", process)
102                 return 1, None
103             raise
104
105     def run_collectd_amqp(self):
106         """ run amqp consumer to collect the NFVi data """
107         amqp_url = 'amqp://admin:admin@{}:{}/%2F'.format(self.mgmt['ip'], self.AMPQ_PORT)
108         amqp = AmqpConsumer(amqp_url, self._queue)
109         try:
110             amqp.run()
111         except (AttributeError, RuntimeError, KeyboardInterrupt):
112             amqp.stop()
113
114     @classmethod
115     def parse_simple_resource(cls, key, value):
116         reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
117         return {reskey: value.split(":")[1]}
118
119     @classmethod
120     def get_cpu_data(cls, res_key0, res_key1, value):
121         """ Get cpu topology of the host """
122         pattern = r"-(\d+)"
123
124         if 'cpufreq' in res_key0:
125             metric, source = res_key0, res_key1
126         else:
127             metric, source = res_key1, res_key0
128
129         match = re.search(pattern, source, re.MULTILINE)
130         if not match:
131             return "error", "Invalid", "", ""
132
133         time, value = value.split(":")
134         return str(match.group(1)), metric, value, time
135
136     @classmethod
137     def parse_hugepages(cls, key, value):
138         return cls.parse_simple_resource(key, value)
139
140     @classmethod
141     def parse_dpdkstat(cls, key, value):
142         return cls.parse_simple_resource(key, value)
143
144     @classmethod
145     def parse_virt(cls, key, value):
146         return cls.parse_simple_resource(key, value)
147
148     @classmethod
149     def parse_ovs_stats(cls, key, value):
150         return cls.parse_simple_resource(key, value)
151
152     @classmethod
153     def parse_intel_pmu_stats(cls, key, value):
154         return {''.join(str(v) for v in key): value.split(":")[1]}
155
156     def parse_collectd_result(self, metrics):
157         """ convert collectd data into json"""
158         result = {
159             "cpu": {},
160             "memory": {},
161             "hugepages": {},
162             "dpdkstat": {},
163             "virt": {},
164             "ovs_stats": {},
165         }
166         testcase = ""
167
168         # unicode decode
169         decoded = ((safe_decode(k, 'utf-8'), safe_decode(v, 'utf-8')) for k, v in metrics.items())
170         for key, value in decoded:
171             key_split = key.split("/")
172             res_key_iter = (key for key in key_split if "nsb_stats" not in key)
173             res_key0 = next(res_key_iter)
174             res_key1 = next(res_key_iter)
175
176             if "cpu" in res_key0 or "intel_rdt" in res_key0 or "intel_pmu" in res_key0:
177                 cpu_key, name, metric, testcase = \
178                     self.get_cpu_data(res_key0, res_key1, value)
179                 result["cpu"].setdefault(cpu_key, {}).update({name: metric})
180
181             elif "memory" in res_key0:
182                 result["memory"].update({res_key1: value.split(":")[0]})
183
184             elif "hugepages" in res_key0:
185                 result["hugepages"].update(self.parse_hugepages(key_split, value))
186
187             elif "dpdkstat" in res_key0:
188                 result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
189
190             elif "virt" in res_key1:
191                 result["virt"].update(self.parse_virt(key_split, value))
192
193             elif "ovs_stats" in res_key0:
194                 result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
195
196         result["timestamp"] = testcase
197
198         return result
199
200     def amqp_process_for_nfvi_kpi(self):
201         """ amqp collect and return nfvi kpis """
202         if self.amqp_client is None and self.enable:
203             self.amqp_client = multiprocessing.Process(
204                 name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
205                 target=self.run_collectd_amqp)
206             self.amqp_client.start()
207
208     def amqp_collect_nfvi_kpi(self):
209         """ amqp collect and return nfvi kpis """
210         if not self.enable:
211             return {}
212
213         metric = {}
214         while not self._queue.empty():
215             metric.update(self._queue.get())
216         msg = self.parse_collectd_result(metric)
217         return msg
218
219     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
220         template = pkg_resources.resource_string("yardstick.network_services.nfvi",
221                                                  nfvi_cfg).decode('utf-8')
222         cfg_content = jinja2.Template(template, trim_blocks=True, lstrip_blocks=True,
223                                       finalize=finalize_for_yaml).render(
224             **template_kwargs)
225         # cfg_content = io.StringIO(template.format(**template_kwargs))
226         cfg_file = os.path.join(config_file_path, nfvi_cfg)
227         # must write as root, so use sudo
228         self.connection.execute("cat | sudo tee {}".format(cfg_file), stdin=cfg_content)
229
230     def _prepare_collectd_conf(self, config_file_path):
231         """ Prepare collectd conf """
232
233         kwargs = {
234             "interval": self.interval,
235             "loadplugins": set(chain(LIST_PLUGINS_ENABLED, self.plugins.keys())),
236             # Optional fields PortName is descriptive only, use whatever is present
237             "port_names": self.port_names,
238             # "ovs_bridge_interfaces": ["br-int"],
239             "plugins": self.plugins,
240         }
241         self._provide_config_file(config_file_path, self.COLLECTD_CONF, kwargs)
242
243     def _setup_ovs_stats(self, connection):
244         try:
245             socket_path = self.plugins["ovs_stats"].get("ovs_socket_path", self.OVS_SOCKET_PATH)
246         except KeyError:
247             # ovs_stats is not a dict
248             socket_path = self.OVS_SOCKET_PATH
249         status = connection.execute("test -S {}".format(socket_path))[0]
250         if status != 0:
251             LOG.error("cannot find OVS socket %s", socket_path)
252
253     def _start_rabbitmq(self, connection):
254         # Reset amqp queue
255         LOG.debug("reset and setup amqp to collect data from collectd")
256         # ensure collectd.conf.d exists to avoid error/warning
257         cmd_list = ["sudo mkdir -p /etc/collectd/collectd.conf.d",
258                     "sudo service rabbitmq-server restart",
259                     "sudo rabbitmqctl stop_app",
260                     "sudo rabbitmqctl reset",
261                     "sudo rabbitmqctl start_app",
262                     "sudo rabbitmqctl add_user admin admin",
263                     "sudo rabbitmqctl authenticate_user admin admin",
264                     "sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'"
265                     ]
266         for cmd in cmd_list:
267                 exit_status, stdout, stderr = connection.execute(cmd)
268                 if exit_status != 0:
269                     raise ResourceCommandError(command=cmd, stderr=stderr)
270
271         # check stdout for "sudo rabbitmqctl status" command
272         cmd = "sudo rabbitmqctl status"
273         _, stdout, stderr = connection.execute(cmd)
274         if not re.search("RabbitMQ", stdout):
275             LOG.error("rabbitmqctl status don't have RabbitMQ in running apps")
276             raise ResourceCommandError(command=cmd, stderr=stderr)
277
278     def _start_collectd(self, connection, bin_path):
279         LOG.debug("Starting collectd to collect NFVi stats")
280         collectd_path = os.path.join(bin_path, "collectd", "sbin", "collectd")
281         config_file_path = os.path.join(bin_path, "collectd", "etc")
282         self._prepare_collectd_conf(config_file_path)
283
284         connection.execute('sudo pkill -x -9 collectd')
285         exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
286         if exit_status != 0:
287             LOG.warning("%s is not present disabling", collectd_path)
288             return
289         if "ovs_stats" in self.plugins:
290             self._setup_ovs_stats(connection)
291
292         LOG.debug("Starting collectd to collect NFVi stats")
293         LOG.debug("Start collectd service..... %s second timeout", self.timeout)
294         # intel_pmu plug requires large numbers of files open, so try to set
295         # ulimit -n to a large value
296         connection.execute("sudo bash -c 'ulimit -n 1000000 ; %s'" % collectd_path,
297                            timeout=self.timeout)
298         LOG.debug("Done")
299
300     def initiate_systemagent(self, bin_path):
301         """ Start system agent for NFVi collection on host """
302         if self.enable:
303             try:
304                 self._start_rabbitmq(self.connection)
305                 self._start_collectd(self.connection, bin_path)
306             except ResourceCommandError as e:
307                 LOG.exception("Exception during collectd and rabbitmq start: %s", str(e))
308                 raise
309
310     def start(self):
311         """ start nfvi collection """
312         if self.enable:
313             LOG.debug("Start NVFi metric collection...")
314
315     def stop(self):
316         """ stop nfvi collection """
317         if not self.enable:
318             return
319
320         agent = "collectd"
321         LOG.debug("Stop resource monitor...")
322
323         if self.amqp_client is not None:
324             # we proper and try to join first
325             self.amqp_client.join(3)
326             self.amqp_client.terminate()
327
328         LOG.debug("Check if %s is running", agent)
329         status, pid = self.check_if_system_agent_running(agent)
330         LOG.debug("status %s  pid %s", status, pid)
331         if status != 0:
332             return
333
334         if pid:
335             self.connection.execute('sudo kill -9 "%s"' % pid)
336         self.connection.execute('sudo pkill -9 "%s"' % agent)
337         self.connection.execute('sudo service rabbitmq-server stop')
338         self.connection.execute("sudo rabbitmqctl stop_app")