Merge "cachestat: use raw strings to escape \d"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 import logging
18 import os.path
19 import re
20 import multiprocessing
21 from oslo_config import cfg
22
23 from yardstick import ssh
24 from yardstick.network_services.nfvi.collectd import AmqpConsumer
25 from yardstick.network_services.utils import provision_tool
26
27 CONF = cfg.CONF
28 ZMQ_OVS_PORT = 5567
29 ZMQ_POLLING_TIME = 12000
30
31
32 class ResourceProfile(object):
33     """
34     This profile adds a resource at the beginning of the test session
35     """
36
37     def __init__(self, vnfd, cores):
38         self.enable = True
39         self.connection = None
40         self.cores = cores
41
42         mgmt_interface = vnfd.get("mgmt-interface")
43         # why the host or ip?
44         self.vnfip = mgmt_interface.get("host", mgmt_interface["ip"])
45         self.connection = ssh.SSH.from_node(mgmt_interface,
46                                             overrides={"ip": self.vnfip})
47
48         self.connection.wait()
49
50     def check_if_sa_running(self, process):
51         """ verify if system agent is running """
52         err, pid, _ = self.connection.execute("pgrep -f %s" % process)
53         return [err == 0, pid]
54
55     def run_collectd_amqp(self, queue):
56         """ run amqp consumer to collect the NFVi data """
57         amqp = \
58             AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
59                          queue)
60         try:
61             amqp.run()
62         except (AttributeError, RuntimeError, KeyboardInterrupt):
63             amqp.stop()
64
65     @classmethod
66     def get_cpu_data(cls, reskey, value):
67         """ Get cpu topology of the host """
68         pattern = r"-(\d+)"
69         if "cpufreq" in reskey[1]:
70             match = re.search(pattern, reskey[2], re.MULTILINE)
71             metric = reskey[1]
72         else:
73             match = re.search(pattern, reskey[1], re.MULTILINE)
74             metric = reskey[2]
75
76         time, val = re.split(":", value)
77         if match:
78             return [str(match.group(1)), metric, val, time]
79
80         return ["error", "Invalid", ""]
81
82     def parse_collectd_result(self, metrics, listcores):
83         """ convert collectd data into json"""
84         res = {"cpu": {}, "memory": {}}
85         testcase = ""
86
87         for key, value in metrics.items():
88             reskey = key.rsplit("/")
89             if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
90                 cpu_key, name, metric, testcase = \
91                     self.get_cpu_data(reskey, value)
92                 if cpu_key in listcores:
93                     res["cpu"].setdefault(cpu_key, {}).update({name: metric})
94             elif "memory" in reskey[1]:
95                 val = re.split(":", value)[1]
96                 res["memory"].update({reskey[2]: val})
97         res["timestamp"] = testcase
98
99         return res
100
101     def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
102         """ amqp collect and return nfvi kpis """
103         try:
104             metric = {}
105             amqp_client = \
106                 multiprocessing.Process(target=self.run_collectd_amqp,
107                                         args=(_queue,))
108             amqp_client.start()
109             amqp_client.join(7)
110             amqp_client.terminate()
111
112             while not _queue.empty():
113                 metric.update(_queue.get())
114         except (AttributeError, RuntimeError, TypeError, ValueError):
115             logging.debug("Failed to get NFVi stats...")
116             msg = {}
117         else:
118             msg = self.parse_collectd_result(metric, self.cores)
119
120         return msg
121
122     @classmethod
123     def _start_collectd(cls, connection, bin_path):
124         connection.execute('pkill -9 collectd')
125         collectd = os.path.join(bin_path, "collectd.sh")
126         provision_tool(connection, collectd)
127         provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
128
129         # Reset amqp queue
130         connection.execute("sudo service rabbitmq-server start")
131         connection.execute("sudo rabbitmqctl stop_app")
132         connection.execute("sudo rabbitmqctl reset")
133         connection.execute("sudo rabbitmqctl start_app")
134         connection.execute("sudo service rabbitmq-server restart")
135
136         # Run collectd
137         connection.execute(collectd)
138         connection.execute(os.path.join(bin_path, "collectd", "collectd"))
139
140     def initiate_systemagent(self, bin_path):
141         """ Start system agent for NFVi collection on host """
142         if self.enable:
143             self._start_collectd(self.connection, bin_path)
144
145     def start(self):
146         """ start nfvi collection """
147         if self.enable:
148             logging.debug("Start NVFi metric collection...")
149
150     def stop(self):
151         """ stop nfvi collection """
152         if self.enable:
153             agent = "collectd"
154             logging.debug("Stop resource monitor...")
155             status, pid = self.check_if_sa_running(agent)
156             if status:
157                 self.connection.execute('kill -9 %s' % pid)
158                 self.connection.execute('pkill -9 %s' % agent)
159                 self.connection.execute('service rabbitmq-server stop')
160                 self.connection.execute("sudo rabbitmqctl stop_app")