1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
16 from __future__ import absolute_import
20 import multiprocessing
21 from oslo_config import cfg
23 from yardstick import ssh
24 from yardstick.network_services.nfvi.collectd import AmqpConsumer
25 from yardstick.network_services.utils import provision_tool
29 ZMQ_POLLING_TIME = 12000
32 class ResourceProfile(object):
34 This profile adds a resource at the beginning of the test session
37 def __init__(self, vnfd, cores):
39 self.connection = None
42 mgmt_interface = vnfd.get("mgmt-interface")
44 self.vnfip = mgmt_interface.get("host", mgmt_interface["ip"])
45 self.connection = ssh.SSH.from_node(mgmt_interface,
46 overrides={"ip": self.vnfip})
48 self.connection.wait()
50 def check_if_sa_running(self, process):
51 """ verify if system agent is running """
52 err, pid, _ = self.connection.execute("pgrep -f %s" % process)
53 return [err == 0, pid]
55 def run_collectd_amqp(self, queue):
56 """ run amqp consumer to collect the NFVi data """
58 AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
62 except (AttributeError, RuntimeError, KeyboardInterrupt):
66 def get_cpu_data(cls, reskey, value):
67 """ Get cpu topology of the host """
69 if "cpufreq" in reskey[1]:
70 match = re.search(pattern, reskey[2], re.MULTILINE)
73 match = re.search(pattern, reskey[1], re.MULTILINE)
76 time, val = re.split(":", value)
78 return [str(match.group(1)), metric, val, time]
80 return ["error", "Invalid", ""]
82 def parse_collectd_result(self, metrics, listcores):
83 """ convert collectd data into json"""
84 res = {"cpu": {}, "memory": {}}
87 for key, value in metrics.items():
88 reskey = key.rsplit("/")
89 if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
90 cpu_key, name, metric, testcase = \
91 self.get_cpu_data(reskey, value)
92 if cpu_key in listcores:
93 res["cpu"].setdefault(cpu_key, {}).update({name: metric})
94 elif "memory" in reskey[1]:
95 val = re.split(":", value)[1]
96 res["memory"].update({reskey[2]: val})
97 res["timestamp"] = testcase
101 def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
102 """ amqp collect and return nfvi kpis """
106 multiprocessing.Process(target=self.run_collectd_amqp,
110 amqp_client.terminate()
112 while not _queue.empty():
113 metric.update(_queue.get())
114 except (AttributeError, RuntimeError, TypeError, ValueError):
115 logging.debug("Failed to get NFVi stats...")
118 msg = self.parse_collectd_result(metric, self.cores)
123 def _start_collectd(cls, connection, bin_path):
124 connection.execute('pkill -9 collectd')
125 collectd = os.path.join(bin_path, "collectd.sh")
126 provision_tool(connection, collectd)
127 provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
130 connection.execute("sudo service rabbitmq-server start")
131 connection.execute("sudo rabbitmqctl stop_app")
132 connection.execute("sudo rabbitmqctl reset")
133 connection.execute("sudo rabbitmqctl start_app")
134 connection.execute("sudo service rabbitmq-server restart")
137 connection.execute(collectd)
138 connection.execute(os.path.join(bin_path, "collectd", "collectd"))
140 def initiate_systemagent(self, bin_path):
141 """ Start system agent for NFVi collection on host """
143 self._start_collectd(self.connection, bin_path)
146 """ start nfvi collection """
148 logging.debug("Start NVFi metric collection...")
151 """ stop nfvi collection """
154 logging.debug("Stop resource monitor...")
155 status, pid = self.check_if_sa_running(agent)
157 self.connection.execute('kill -9 %s' % pid)
158 self.connection.execute('pkill -9 %s' % agent)
159 self.connection.execute('service rabbitmq-server stop')
160 self.connection.execute("sudo rabbitmqctl stop_app")