Merge "Added NSB descriptors for vCMTS testcase"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / vcmts_vnf.py
1 # Copyright (c) 2019 Viosoft Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import logging
16 import os
17 import yaml
18
19 from influxdb import InfluxDBClient
20
21 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SetupEnvHelper
22 from yardstick.common import constants
23 from yardstick.common import exceptions
24 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
25 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper
26 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
27 from yardstick.network_services.utils import get_nsb_option
28
29
30 LOG = logging.getLogger(__name__)
31
32
33 class InfluxDBHelper(object):
34
35     INITIAL_VALUE = 'now() - 1m'
36
37     def __init__(self, vcmts_influxdb_ip, vcmts_influxdb_port):
38         self._vcmts_influxdb_ip = vcmts_influxdb_ip
39         self._vcmts_influxdb_port = vcmts_influxdb_port
40         self._last_upstream_rx = self.INITIAL_VALUE
41         self._last_values_time = dict()
42
43     def start(self):
44         self._read_client = InfluxDBClient(host=self._vcmts_influxdb_ip,
45                                            port=self._vcmts_influxdb_port,
46                                            database='collectd')
47         self._write_client = InfluxDBClient(host=constants.INFLUXDB_IP,
48                                             port=constants.INFLUXDB_PORT,
49                                             database='collectd')
50
51     def _get_last_value_time(self, measurement):
52         if measurement in self._last_values_time:
53             return self._last_values_time[measurement]
54         return self.INITIAL_VALUE
55
56     def _set_last_value_time(self, measurement, time):
57         self._last_values_time[measurement] = "'" + time + "'"
58
59     def _query_measurement(self, measurement):
60         # There is a delay before influxdb flushes the data
61         query = "SELECT * FROM " + measurement + " WHERE time > " \
62                 + self._get_last_value_time(measurement) \
63                 + " ORDER BY time ASC;"
64         query_result = self._read_client.query(query)
65         if len(query_result.keys()) == 0:
66             return None
67         return query_result.get_points(measurement)
68
69     def _rw_measurment(self, measurement, columns):
70         query_result = self._query_measurement(measurement)
71         if query_result == None:
72             return
73
74         points_to_write = list()
75         for entry in query_result:
76             point = {
77                 "measurement": measurement,
78                 "tags": {
79                     "type": entry['type'],
80                     "host": entry['host']
81                 },
82                 "time": entry['time'],
83                 "fields": {}
84             }
85
86             for column in columns:
87                 if column == 'value':
88                     point["fields"][column] = float(entry[column])
89                 else:
90                     point["fields"][column] = entry[column]
91
92             points_to_write.append(point)
93             self._set_last_value_time(measurement, entry['time'])
94
95         # Write the points to yardstick database
96         if self._write_client.write_points(points_to_write):
97             LOG.debug("%d new points written to '%s' measurement",
98                       len(points_to_write), measurement)
99
100     def copy_kpi(self):
101         self._rw_measurment("cpu_value", ["instance", "type_instance", "value"])
102         self._rw_measurment("cpufreq_value", ["type_instance", "value"])
103         self._rw_measurment("downstream_rx", ["value"])
104         self._rw_measurment("downstream_tx", ["value"])
105         self._rw_measurment("downstream_value", ["value"])
106         self._rw_measurment("ds_per_cm_value", ["instance", "value"])
107         self._rw_measurment("intel_rdt_value", ["instance", "type_instance", "value"])
108         self._rw_measurment("turbostat_value", ["instance", "type_instance", "value"])
109         self._rw_measurment("upstream_rx", ["value"])
110         self._rw_measurment("upstream_tx", ["value"])
111         self._rw_measurment("upstream_value", ["value"])
112
113
114 class VcmtsdSetupEnvHelper(SetupEnvHelper):
115
116     BASE_PARAMETERS = "export LD_LIBRARY_PATH=/opt/collectd/lib:;"\
117                     + "export CMK_PROC_FS=/host/proc;"
118
119     def build_us_parameters(self, pod_cfg):
120         return self.BASE_PARAMETERS + " " \
121              + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \
122              + " --socket-id=" + pod_cfg['cpu_socket_id'] \
123              + " --pool=shared" \
124              + " /vcmts-config/run_upstream.sh " + pod_cfg['sg_id'] \
125              + " " + pod_cfg['ds_core_type'] \
126              + " " + pod_cfg['num_ofdm'] + "ofdm" \
127              + " " + pod_cfg['num_subs'] + "cm" \
128              + " " + pod_cfg['cm_crypto'] \
129              + " " + pod_cfg['qat'] \
130              + " " + pod_cfg['net_us'] \
131              + " " + pod_cfg['power_mgmt']
132
133     def build_ds_parameters(self, pod_cfg):
134         return self.BASE_PARAMETERS + " " \
135              + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \
136              + " --socket-id=" + pod_cfg['cpu_socket_id'] \
137              + " --pool=" + pod_cfg['ds_core_type'] \
138              + " /vcmts-config/run_downstream.sh " + pod_cfg['sg_id'] \
139              + " " + pod_cfg['ds_core_type'] \
140              + " " + pod_cfg['ds_core_pool_index'] \
141              + " " + pod_cfg['num_ofdm'] + "ofdm" \
142              + " " + pod_cfg['num_subs'] + "cm" \
143              + " " + pod_cfg['cm_crypto'] \
144              + " " + pod_cfg['qat'] \
145              + " " + pod_cfg['net_ds'] \
146              + " " + pod_cfg['power_mgmt']
147
148     def build_cmd(self, stream_dir, pod_cfg):
149         if stream_dir == 'ds':
150             return self.build_ds_parameters(pod_cfg)
151         else:
152             return self.build_us_parameters(pod_cfg)
153
154     def run_vcmtsd(self, stream_dir, pod_cfg):
155         cmd = self.build_cmd(stream_dir, pod_cfg)
156         LOG.debug("Executing %s", cmd)
157         self.ssh_helper.send_command(cmd)
158
159     def setup_vnf_environment(self):
160         pass
161
162
163 class VcmtsVNF(GenericVNF):
164
165     RUN_WAIT = 4
166
167     def __init__(self, name, vnfd):
168         super(VcmtsVNF, self).__init__(name, vnfd)
169         self.name = name
170         self.bin_path = get_nsb_option('bin_path', '')
171         self.scenario_helper = ScenarioHelper(self.name)
172         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
173
174         self.setup_helper = VcmtsdSetupEnvHelper(self.vnfd_helper,
175                                                  self.ssh_helper,
176                                                  self.scenario_helper)
177
178     def extract_pod_cfg(self, vcmts_pods_cfg, sg_id):
179         for pod_cfg in vcmts_pods_cfg:
180             if pod_cfg['sg_id'] == sg_id:
181                 return pod_cfg
182
183     def instantiate(self, scenario_cfg, context_cfg):
184         self._update_collectd_options(scenario_cfg, context_cfg)
185         self.scenario_helper.scenario_cfg = scenario_cfg
186         self.context_cfg = context_cfg
187
188         options = scenario_cfg.get('options', {})
189
190         try:
191             self.vcmts_influxdb_ip = options['vcmts_influxdb_ip']
192             self.vcmts_influxdb_port = options['vcmts_influxdb_port']
193         except KeyError:
194             raise KeyError("Missing destination InfluxDB details in scenario" \
195                            " section of the task definition file")
196
197         try:
198             vcmtsd_values_filepath = options['vcmtsd_values']
199         except KeyError:
200             raise KeyError("Missing vcmtsd_values key in scenario options" \
201                            "section of the task definition file")
202
203         if not os.path.isfile(vcmtsd_values_filepath):
204             raise RuntimeError("The vcmtsd_values file path provided " \
205                                "does not exists")
206
207         # The yaml_loader.py (SafeLoader) underlying regex has an issue
208         # with reading PCI addresses (processed as double). so the
209         # BaseLoader is used here.
210         with open(vcmtsd_values_filepath) as stream:
211             vcmtsd_values = yaml.load(stream, Loader=yaml.BaseLoader)
212
213         if vcmtsd_values == None:
214             raise RuntimeError("Error reading vcmtsd_values file provided (" +
215                                vcmtsd_values_filepath + ")")
216
217         vnf_options = options.get(self.name, {})
218         sg_id = str(vnf_options['sg_id'])
219         stream_dir = vnf_options['stream_dir']
220
221         try:
222             vcmts_pods_cfg = vcmtsd_values['topology']['vcmts_pods']
223         except KeyError:
224             raise KeyError("Missing vcmts_pods key in the " \
225                            "vcmtsd_values file provided")
226
227         pod_cfg = self.extract_pod_cfg(vcmts_pods_cfg, sg_id)
228         if pod_cfg == None:
229             raise exceptions.IncorrectConfig(error_msg="Service group " + sg_id + " not found")
230
231         self.setup_helper.run_vcmtsd(stream_dir, pod_cfg)
232
233     def _update_collectd_options(self, scenario_cfg, context_cfg):
234         scenario_options = scenario_cfg.get('options', {})
235         generic_options = scenario_options.get('collectd', {})
236         scenario_node_options = scenario_options.get(self.name, {})\
237             .get('collectd', {})
238         context_node_options = context_cfg.get('nodes', {})\
239             .get(self.name, {}).get('collectd', {})
240
241         options = generic_options
242         self._update_options(options, scenario_node_options)
243         self._update_options(options, context_node_options)
244
245         self.setup_helper.collectd_options = options
246
247     def _update_options(self, options, additional_options):
248         for k, v in additional_options.items():
249             if isinstance(v, dict) and k in options:
250                 options[k].update(v)
251             else:
252                 options[k] = v
253
254     def wait_for_instantiate(self):
255         pass
256
257     def terminate(self):
258         pass
259
260     def scale(self, flavor=""):
261         pass
262
263     def collect_kpi(self):
264         self.influxdb_helper.copy_kpi()
265         return {"n/a": "n/a"}
266
267     def start_collect(self):
268         self.influxdb_helper = InfluxDBHelper(self.vcmts_influxdb_ip,
269                                               self.vcmts_influxdb_port)
270         self.influxdb_helper.start()
271
272     def stop_collect(self):
273         pass