1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
16 from __future__ import absolute_import
20 import multiprocessing
21 from oslo_config import cfg
23 from yardstick import ssh
24 from yardstick.network_services.nfvi.collectd import AmqpConsumer
25 from yardstick.network_services.utils import provision_tool
29 ZMQ_POLLING_TIME = 12000
32 class ResourceProfile(object):
34 This profile adds a resource at the beginning of the test session
37 def __init__(self, vnfd, cores):
39 self.connection = None
42 mgmt_interface = vnfd.get("mgmt-interface")
43 user = mgmt_interface.get("user")
44 passwd = mgmt_interface.get("password")
45 ip_addr = mgmt_interface.get("ip")
46 self.vnfip = mgmt_interface.get("host", ip_addr)
47 ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
48 self.connection = ssh.SSH(user, self.vnfip,
49 password=passwd, port=ssh_port)
50 self.connection.wait()
52 def check_if_sa_running(self, process):
53 """ verify if system agent is running """
54 err, pid, _ = self.connection.execute("pgrep -f %s" % process)
55 return [err == 0, pid]
57 def run_collectd_amqp(self, queue):
58 """ run amqp consumer to collect the NFVi data """
60 AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
64 except (AttributeError, RuntimeError, KeyboardInterrupt):
68 def get_cpu_data(cls, reskey, value):
69 """ Get cpu topology of the host """
71 if "cpufreq" in reskey[1]:
72 match = re.search(pattern, reskey[2], re.MULTILINE)
75 match = re.search(pattern, reskey[1], re.MULTILINE)
78 time, val = re.split(":", value)
80 return [str(match.group(1)), metric, val, time]
82 return ["error", "Invalid", ""]
84 def parse_collectd_result(self, metrics, listcores):
85 """ convert collectd data into json"""
86 res = {"cpu": {}, "memory": {}}
89 for key, value in metrics.items():
90 reskey = key.rsplit("/")
91 if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
92 cpu_key, name, metric, testcase = \
93 self.get_cpu_data(reskey, value)
94 if cpu_key in listcores:
95 res["cpu"].setdefault(cpu_key, {}).update({name: metric})
96 elif "memory" in reskey[1]:
97 val = re.split(":", value)[1]
98 res["memory"].update({reskey[2]: val})
99 res["timestamp"] = testcase
103 def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
104 """ amqp collect and return nfvi kpis """
108 multiprocessing.Process(target=self.run_collectd_amqp,
112 amqp_client.terminate()
114 while not _queue.empty():
115 metric.update(_queue.get())
116 except (AttributeError, RuntimeError, TypeError, ValueError):
117 logging.debug("Failed to get NFVi stats...")
120 msg = self.parse_collectd_result(metric, self.cores)
125 def _start_collectd(cls, connection, bin_path):
126 connection.execute('pkill -9 collectd')
127 collectd = os.path.join(bin_path, "collectd.sh")
128 provision_tool(connection, collectd)
129 provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
132 connection.execute("sudo service rabbitmq-server start")
133 connection.execute("sudo rabbitmqctl stop_app")
134 connection.execute("sudo rabbitmqctl reset")
135 connection.execute("sudo rabbitmqctl start_app")
136 connection.execute("sudo service rabbitmq-server restart")
139 connection.execute(collectd)
140 connection.execute(os.path.join(bin_path, "collectd", "collectd"))
142 def initiate_systemagent(self, bin_path):
143 """ Start system agent for NFVi collection on host """
145 self._start_collectd(self.connection, bin_path)
148 """ start nfvi collection """
150 logging.debug("Start NVFi metric collection...")
153 """ stop nfvi collection """
156 logging.debug("Stop resource monitor...")
157 status, pid = self.check_if_sa_running(agent)
159 self.connection.execute('kill -9 %s' % pid)
160 self.connection.execute('pkill -9 %s' % agent)
161 self.connection.execute('service rabbitmq-server stop')
162 self.connection.execute("sudo rabbitmqctl stop_app")