from yardstick import ssh
from yardstick.network_services.nfvi.collectd import AmqpConsumer
-from yardstick.network_services.utils import provision_tool
+from yardstick.network_services.utils import get_nsb_option
LOG = logging.getLogger(__name__)
ZMQ_OVS_PORT = 5567
ZMQ_POLLING_TIME = 12000
LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "intel_rdt", "memory",
- "hugepages", "dpdkstat", "virt", "ovs_stats"]
+ "hugepages", "dpdkstat", "virt", "ovs_stats", "intel_pmu"]
class ResourceProfile(object):
@classmethod
def parse_simple_resource(cls, key, value):
- return {'/'.join(key): value.split(":")[1]}
+ reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
+ return {reskey: value.split(":")[1]}
@classmethod
- def get_cpu_data(cls, key_split, value):
+ def get_cpu_data(cls, res_key0, res_key1, value):
""" Get cpu topology of the host """
pattern = r"-(\d+)"
- if "cpufreq" in key_split[0]:
- metric = key_split[0]
- source = key_split[1]
+
+ if 'cpufreq' in res_key0:
+ metric, source = res_key0, res_key1
else:
- metric = key_split[1]
- source = key_split[0]
+ metric, source = res_key1, res_key0
match = re.search(pattern, source, re.MULTILINE)
if not match:
def parse_ovs_stats(cls, key, value):
return cls.parse_simple_resource(key, value)
+ @classmethod
+ def parse_intel_pmu_stats(cls, key, value):
+ return {''.join(key): value.split(":")[1]}
+
def parse_collectd_result(self, metrics, core_list):
""" convert collectd data into json"""
result = {
"dpdkstat": {},
"virt": {},
"ovs_stats": {},
+ "intel_pmu": {},
}
testcase = ""
res_key1 = next(res_key_iter)
if "cpu" in res_key0 or "intel_rdt" in res_key0:
- cpu_key, name, metric, testcase = self.get_cpu_data(key_split, value)
+ cpu_key, name, metric, testcase = \
+ self.get_cpu_data(res_key0, res_key1, value)
if cpu_key in core_list:
result["cpu"].setdefault(cpu_key, {}).update({name: metric})
result["memory"].update({res_key1: value.split(":")[0]})
elif "hugepages" in res_key0:
- result["hugepages"].update(self.parse_hugepages(key, value))
+ result["hugepages"].update(self.parse_hugepages(key_split, value))
elif "dpdkstat" in res_key0:
- result["dpdkstat"].update(self.parse_dpdkstat(key, value))
+ result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
elif "virt" in res_key1:
- result["virt"].update(self.parse_virt(key, value))
+ result["virt"].update(self.parse_virt(key_split, value))
elif "ovs_stats" in res_key0:
- result["ovs_stats"].update(self.parse_ovs_stats(key, value))
+ result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
+
+ elif "intel_pmu-all" in res_key0:
+ result["intel_pmu"].update(self.parse_intel_pmu_stats(res_key1, value))
result["timestamp"] = testcase
def amqp_process_for_nfvi_kpi(self):
""" amqp collect and return nfvi kpis """
- if self.amqp_client is None:
+ if self.amqp_client is None and self.enable:
self.amqp_client = \
multiprocessing.Process(target=self.run_collectd_amqp)
self.amqp_client.start()
def amqp_collect_nfvi_kpi(self):
""" amqp collect and return nfvi kpis """
+ if not self.enable:
+ return {}
+
metric = {}
while not self._queue.empty():
metric.update(self._queue.get())
"loadplugin": loadplugin,
"dpdk_interface": interfaces,
}
-
self._provide_config_file(bin_path, 'collectd.conf', kwargs)
def _start_collectd(self, connection, bin_path):
LOG.debug("Starting collectd to collect NFVi stats")
- # temp disable
- return
connection.execute('sudo pkill -9 collectd')
- collectd = os.path.join(bin_path, "collectd.sh")
- provision_tool(connection, collectd)
+ bin_path = get_nsb_option("bin_path")
+ collectd_path = os.path.join(bin_path, "collectd", "collectd")
+ exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
+ if exit_status != 0:
+ LOG.warning("%s is not present disabling", collectd_path)
+ # disable auto-provisioning because it requires Internet access
+ # collectd_installer = os.path.join(bin_path, "collectd.sh")
+ # provision_tool(connection, collectd)
+ # http_proxy = os.environ.get('http_proxy', '')
+ # https_proxy = os.environ.get('https_proxy', '')
+ # connection.execute("sudo %s '%s' '%s'" % (
+ # collectd_installer, http_proxy, https_proxy))
+ return
+ LOG.debug("Starting collectd to collect NFVi stats")
self._prepare_collectd_conf(bin_path)
# Reset amqp queue
connection.execute("sudo rabbitmqctl start_app")
connection.execute("sudo service rabbitmq-server restart")
- # Run collectd
+ LOG.debug("Creating amdin user for rabbitmq in order to collect data from collectd")
+ connection.execute("sudo rabbitmqctl delete_user guest")
+ connection.execute("sudo rabbitmqctl add_user admin admin")
+ connection.execute("sudo rabbitmqctl authenticate_user admin admin")
+ connection.execute("sudo rabbitmqctl set_permissions -p / admin \".*\" \".*\" \".*\"")
- http_proxy = os.environ.get('http_proxy', '')
- https_proxy = os.environ.get('https_proxy', '')
- connection.execute("sudo %s '%s' '%s'" %
- (collectd, http_proxy, https_proxy))
LOG.debug("Start collectd service.....")
- connection.execute(
- "sudo %s" % os.path.join(bin_path, "collectd", "collectd"))
+ connection.execute("sudo %s" % collectd_path)
LOG.debug("Done")
def initiate_systemagent(self, bin_path):