add collectd resource node capability 53/44153/1
authorRoss Brattain <ross.b.brattain@intel.com>
Thu, 28 Sep 2017 07:10:43 +0000 (00:10 -0700)
committerRoss Brattain <ross.b.brattain@intel.com>
Tue, 3 Oct 2017 18:02:09 +0000 (11:02 -0700)
allow manually adding collectd nodes using Node context.

if a node is present with a collectd config dict then
we can create a ResourceProfile object for it
and connect to collectd.

example

nodes:
-
    name: compute_0
    role: Compute
    ip: 1.1.1.1
    user: root
    password: r00t
    collectd:
        interval: 5
        plugins:
            ovs_stats: {}

Change-Id: Ie0c00fdb58373206071daa1fb13faf175c4313e0
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
etc/yardstick/nodes/pod.yaml.collectd.sample [new file with mode: 0644]
tests/unit/network_services/collector/test_subscriber.py
tests/unit/network_services/nfvi/test_resource.py
yardstick/benchmark/scenarios/networking/vnf_generic.py
yardstick/network_services/collector/subscriber.py
yardstick/network_services/nfvi/resource.py

diff --git a/etc/yardstick/nodes/pod.yaml.collectd.sample b/etc/yardstick/nodes/pod.yaml.collectd.sample
new file mode 100644 (file)
index 0000000..6ebf9e2
--- /dev/null
@@ -0,0 +1,25 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+nodes:
+-
+    name: compute_0
+    role: Compute
+    ip: 1.1.1.1
+    user: root
+    password: r00t
+    collectd:
+        interval: 5
+        plugins:
+            ovs_stats: {}
index 373f5dc..260f0bb 100644 (file)
 
 from __future__ import absolute_import
 import unittest
+import mock
 
 from yardstick.network_services.collector import subscriber
 
 
-class CollectorTestCase(unittest.TestCase):
-
-    TRAFFIC_PROFILE = {}
-    VNFS = {}
-
-    def setUp(self):
-        self.test_subscriber = subscriber.Collector(self.TRAFFIC_PROFILE,
-                                                    self.VNFS)
+class MockVnfAprrox(object):
 
-    def test_successful_init(self):
+    def __init__(self):
+        self.result = {}
+        self.name = "vnf__1"
 
-        self.assertEqual(self.test_subscriber.traffic_profile, {})
-        self.assertEqual(self.test_subscriber.service, {})
+    def collect_kpi(self):
+        self.result = {
+            'pkt_in_up_stream': 100,
+            'pkt_drop_up_stream': 5,
+            'pkt_in_down_stream': 50,
+            'pkt_drop_down_stream': 40
+        }
+        return self.result
 
-    def test_unsuccessful_init(self):
-        pass
 
-    def test_start(self):
-        self.assertIsNone(self.test_subscriber.start())
+class CollectorTestCase(unittest.TestCase):
 
-    def test_stop(self):
-        self.assertIsNone(self.test_subscriber.stop())
+    NODES = {
+        'node1': {},
+        'node2': {
+            'collectd': {
+                'plugins': {'abc': 12, 'def': 34},
+                'interval': 987,
+            },
+        },
+    }
+    TRAFFIC_PROFILE = {
+        'key1': 'value1',
+    }
 
-    def test_get_kpi(self):
+    def setUp(self):
+        vnf = MockVnfAprrox()
+        self.ssh_patch = mock.patch('yardstick.network_services.nfvi.resource.ssh', autospec=True)
+        mock_ssh = self.ssh_patch.start()
+        mock_instance = mock.Mock()
+        mock_instance.execute.return_value = 0, '', ''
+        mock_ssh.AutoConnectSSH.from_node.return_value = mock_instance
+        self.collector = subscriber.Collector([vnf], self.NODES, self.TRAFFIC_PROFILE, 1800)
+
+    def tearDown(self):
+        self.ssh_patch.stop()
+
+    def test___init__(self, *_):
+        vnf = MockVnfAprrox()
+        collector = subscriber.Collector([vnf], {}, {})
+        self.assertEqual(len(collector.vnfs), 1)
+        self.assertEqual(collector.traffic_profile, {})
+
+    def test___init___with_data(self, *_):
+        self.assertEqual(len(self.collector.vnfs), 1)
+        self.assertDictEqual(self.collector.traffic_profile, self.TRAFFIC_PROFILE)
+        self.assertEqual(len(self.collector.resource_profiles), 1)
+
+    def test___init___negative(self, *_):
+        pass
 
-        class VnfAprrox(object):
-            def __init__(self):
-                self.result = {}
-                self.name = "vnf__1"
+    def test_start(self, *_):
+        self.assertIsNone(self.collector.start())
 
-            def collect_kpi(self):
-                self.result = {'pkt_in_up_stream': 100,
-                               'pkt_drop_up_stream': 5,
-                               'pkt_in_down_stream': 50,
-                               'pkt_drop_down_stream': 40}
-                return self.result
+    def test_stop(self, *_):
+        self.assertIsNone(self.collector.stop())
 
-        vnf = VnfAprrox()
-        result = self.test_subscriber.get_kpi(vnf)
+    def test_get_kpi(self, *_):
+        result = self.collector.get_kpi()
 
         self.assertEqual(result["vnf__1"]["pkt_in_up_stream"], 100)
         self.assertEqual(result["vnf__1"]["pkt_drop_up_stream"], 5)
         self.assertEqual(result["vnf__1"]["pkt_in_down_stream"], 50)
         self.assertEqual(result["vnf__1"]["pkt_drop_down_stream"], 40)
+        self.assertIn('node2', result)
index eba38c6..4fc6d77 100644 (file)
@@ -105,7 +105,7 @@ class TestResourceProfile(unittest.TestCase):
 
     def test_check_if_sa_running(self):
         self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
-                         [True, {}])
+                         (True, {}))
 
     def test_get_cpu_data(self):
         reskey = ["", "cpufreq", "cpufreq-0"]
index 3f61116..d9cc0ea 100644 (file)
@@ -607,7 +607,7 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
             traffic_gen.listen_traffic(self.traffic_profile)
 
         # register collector with yardstick for KPI collection.
-        self.collector = Collector(self.vnfs, self.traffic_profile)
+        self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
         self.collector.start()
 
         # Start the actual traffic
@@ -623,11 +623,11 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
         :return: None
         """
 
-        for vnf in self.vnfs:
-            # Result example:
-            # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
-            LOG.debug("collect KPI for %s", vnf.name)
-            result.update(self.collector.get_kpi(vnf))
+        # this is the only method that is check from the runner
+        # so if we have any fatal error it must be raised via these methods
+        # otherwise we will not terminate
+
+        result.update(self.collector.get_kpi())
 
     def teardown(self):
         """ Stop the collector and terminate VNF & TG instance
index 3bcb208..db52e0b 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 """This module implements stub for publishing results in yardstick format."""
+import logging
+
+from yardstick.network_services.nfvi.resource import ResourceProfile
+from yardstick.network_services.utils import get_nsb_option
+
+LOG = logging.getLogger(__name__)
 
 
 class Collector(object):
     """Class that handles dictionary of results in yardstick-plot format."""
 
-    def __init__(self, traffic_profile, vnfs):
+    @staticmethod
+    def make_resource_profile(node, timeout):
+        # node dict works as mgmt dict
+        # don't need port names, there is no way we can
+        # tell what port is used on the compute node
+        collectd_options = node["collectd"]
+        plugins = collectd_options.get("plugins", {})
+        interval = collectd_options.get("interval")
+
+        # use default cores = None to MatchAllCores
+        return ResourceProfile(node, plugins=plugins, interval=interval, timeout=timeout)
+
+    def __init__(self, vnfs, nodes, traffic_profile, timeout=3600):
         super(Collector, self).__init__()
         self.traffic_profile = traffic_profile
-        self.service = vnfs
+        self.vnfs = vnfs
+        self.nodes = nodes
+        self.timeout = timeout
+        self.bin_path = get_nsb_option('bin_path', '')
+        self.resource_profiles = {node_name: self.make_resource_profile(node, self.timeout)
+                                  for node_name, node in self.nodes.items()
+                                  if node.get("collectd")}
 
     def start(self):
         """Nothing to do, yet"""
-        pass
+        for resource in self.resource_profiles.values():
+            resource.initiate_systemagent(self.bin_path)
+            resource.start()
+            resource.amqp_process_for_nfvi_kpi()
 
     def stop(self):
         """Nothing to do, yet"""
-        pass
+        for resource in self.resource_profiles.values():
+            resource.stop()
 
-    @classmethod
-    def get_kpi(cls, vnf):
+    def get_kpi(self):
         """Returns dictionary of results in yardstick-plot format
 
         :return:
         """
-        return {vnf.name: vnf.collect_kpi()}
+        results = {}
+        for vnf in self.vnfs:
+            # Result example:
+            # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
+            LOG.debug("collect KPI for %s", vnf.name)
+            results[vnf.name] = vnf.collect_kpi()
+
+        for node_name, resource in self.resource_profiles.items():
+            # Result example:
+            # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
+            LOG.debug("collect KPI for %s", node_name)
+            if not resource.check_if_sa_running("collectd")[0]:
+                continue
+
+            try:
+                results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()}
+            except Exception:
+                LOG.exception("")
+        return results
index d807f5e..7e8334c 100644 (file)
@@ -51,16 +51,26 @@ class ResourceProfile(object):
     COLLECTD_CONF = "collectd.conf"
     AMPQ_PORT = 5672
     DEFAULT_INTERVAL = 25
+    DEFAULT_TIMEOUT = 3600
+
+    def __init__(self, mgmt, port_names=None, cores=None, plugins=None,
+                 interval=None, timeout=None):
 
-    def __init__(self, mgmt, port_names=None, cores=None, plugins=None, interval=None):
         if plugins is None:
             self.plugins = {}
         else:
             self.plugins = plugins
+
         if interval is None:
             self.interval = self.DEFAULT_INTERVAL
         else:
             self.interval = interval
+
+        if timeout is None:
+            self.timeout = self.DEFAULT_TIMEOUT
+        else:
+            self.timeout = timeout
+
         self.enable = True
         self.cores = validate_non_string_sequence(cores, default=[])
         self._queue = multiprocessing.Queue()
@@ -73,8 +83,8 @@ class ResourceProfile(object):
 
     def check_if_sa_running(self, process):
         """ verify if system agent is running """
-        err, pid, _ = self.connection.execute("pgrep -f %s" % process)
-        return [err == 0, pid]
+        status, pid, _ = self.connection.execute("pgrep -f %s" % process)
+        return status == 0, pid
 
     def run_collectd_amqp(self):
         """ run amqp consumer to collect the NFVi data """