Merge "Add a new runner to test end-to-end fast data path"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Resource collection definitions """
15
16 from __future__ import absolute_import
17 import logging
18 import os.path
19 import re
20 import multiprocessing
21 from oslo_config import cfg
22
23 from yardstick import ssh
24 from yardstick.network_services.nfvi.collectd import AmqpConsumer
25 from yardstick.network_services.utils import provision_tool
26
27 CONF = cfg.CONF
28 ZMQ_OVS_PORT = 5567
29 ZMQ_POLLING_TIME = 12000
30
31
32 class ResourceProfile(object):
33     """
34     This profile adds a resource at the beginning of the test session
35     """
36
37     def __init__(self, vnfd, cores):
38         self.enable = True
39         self.connection = None
40         self.cores = cores
41
42         mgmt_interface = vnfd.get("mgmt-interface")
43         user = mgmt_interface.get("user")
44         passwd = mgmt_interface.get("password")
45         ip_addr = mgmt_interface.get("ip")
46         self.vnfip = mgmt_interface.get("host", ip_addr)
47         ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
48         self.connection = ssh.SSH(user, self.vnfip,
49                                   password=passwd, port=ssh_port)
50         self.connection.wait()
51
52     def check_if_sa_running(self, process):
53         """ verify if system agent is running """
54         err, pid, _ = self.connection.execute("pgrep -f %s" % process)
55         return [err == 0, pid]
56
57     def run_collectd_amqp(self, queue):
58         """ run amqp consumer to collect the NFVi data """
59         amqp = \
60             AmqpConsumer('amqp://admin:admin@{}:5672/%2F'.format(self.vnfip),
61                          queue)
62         try:
63             amqp.run()
64         except (AttributeError, RuntimeError, KeyboardInterrupt):
65             amqp.stop()
66
67     @classmethod
68     def get_cpu_data(cls, reskey, value):
69         """ Get cpu topology of the host """
70         pattern = r"-(\d+)"
71         if "cpufreq" in reskey[1]:
72             match = re.search(pattern, reskey[2], re.MULTILINE)
73             metric = reskey[1]
74         else:
75             match = re.search(pattern, reskey[1], re.MULTILINE)
76             metric = reskey[2]
77
78         time, val = re.split(":", value)
79         if match:
80             return [str(match.group(1)), metric, val, time]
81
82         return ["error", "Invalid", ""]
83
84     def parse_collectd_result(self, metrics, listcores):
85         """ convert collectd data into json"""
86         res = {"cpu": {}, "memory": {}}
87         testcase = ""
88
89         for key, value in metrics.items():
90             reskey = key.rsplit("/")
91             if "cpu" in reskey[1] or "intel_rdt" in reskey[1]:
92                 cpu_key, name, metric, testcase = \
93                     self.get_cpu_data(reskey, value)
94                 if cpu_key in listcores:
95                     res["cpu"].setdefault(cpu_key, {}).update({name: metric})
96             elif "memory" in reskey[1]:
97                 val = re.split(":", value)[1]
98                 res["memory"].update({reskey[2]: val})
99         res["timestamp"] = testcase
100
101         return res
102
103     def amqp_collect_nfvi_kpi(self, _queue=multiprocessing.Queue()):
104         """ amqp collect and return nfvi kpis """
105         try:
106             metric = {}
107             amqp_client = \
108                 multiprocessing.Process(target=self.run_collectd_amqp,
109                                         args=(_queue,))
110             amqp_client.start()
111             amqp_client.join(7)
112             amqp_client.terminate()
113
114             while not _queue.empty():
115                 metric.update(_queue.get())
116         except (AttributeError, RuntimeError, TypeError, ValueError):
117             logging.debug("Failed to get NFVi stats...")
118             msg = {}
119         else:
120             msg = self.parse_collectd_result(metric, self.cores)
121
122         return msg
123
124     @classmethod
125     def _start_collectd(cls, connection, bin_path):
126         connection.execute('pkill -9 collectd')
127         collectd = os.path.join(bin_path, "collectd.sh")
128         provision_tool(connection, collectd)
129         provision_tool(connection, os.path.join(bin_path, "collectd.conf"))
130
131         # Reset amqp queue
132         connection.execute("sudo service rabbitmq-server start")
133         connection.execute("sudo rabbitmqctl stop_app")
134         connection.execute("sudo rabbitmqctl reset")
135         connection.execute("sudo rabbitmqctl start_app")
136         connection.execute("sudo service rabbitmq-server restart")
137
138         # Run collectd
139         connection.execute(collectd)
140         connection.execute(os.path.join(bin_path, "collectd", "collectd"))
141
142     def initiate_systemagent(self, bin_path):
143         """ Start system agent for NFVi collection on host """
144         if self.enable:
145             self._start_collectd(self.connection, bin_path)
146
147     def start(self):
148         """ start nfvi collection """
149         if self.enable:
150             logging.debug("Start NVFi metric collection...")
151
152     def stop(self):
153         """ stop nfvi collection """
154         if self.enable:
155             agent = "collectd"
156             logging.debug("Stop resource monitor...")
157             status, pid = self.check_if_sa_running(agent)
158             if status:
159                 self.connection.execute('kill -9 %s' % pid)
160                 self.connection.execute('pkill -9 %s' % agent)
161                 self.connection.execute('service rabbitmq-server stop')
162                 self.connection.execute("sudo rabbitmqctl stop_app")