1 # Copyright (c) 2019 Viosoft Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from influxdb import InfluxDBClient
21 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SetupEnvHelper
22 from yardstick.common import constants
23 from yardstick.common import exceptions
24 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
25 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper
26 from yardstick.network_services.vnf_generic.vnf.vnf_ssh_helper import VnfSshHelper
27 from yardstick.network_services.utils import get_nsb_option
30 LOG = logging.getLogger(__name__)
33 class InfluxDBHelper(object):
35 INITIAL_VALUE = 'now() - 1m'
37 def __init__(self, vcmts_influxdb_ip, vcmts_influxdb_port):
38 self._vcmts_influxdb_ip = vcmts_influxdb_ip
39 self._vcmts_influxdb_port = vcmts_influxdb_port
40 self._last_upstream_rx = self.INITIAL_VALUE
41 self._last_values_time = dict()
44 self._read_client = InfluxDBClient(host=self._vcmts_influxdb_ip,
45 port=self._vcmts_influxdb_port,
47 self._write_client = InfluxDBClient(host=constants.INFLUXDB_IP,
48 port=constants.INFLUXDB_PORT,
51 def _get_last_value_time(self, measurement):
52 if measurement in self._last_values_time:
53 return self._last_values_time[measurement]
54 return self.INITIAL_VALUE
56 def _set_last_value_time(self, measurement, time):
57 self._last_values_time[measurement] = "'" + time + "'"
59 def _query_measurement(self, measurement):
60 # There is a delay before influxdb flushes the data
61 query = "SELECT * FROM " + measurement + " WHERE time > " \
62 + self._get_last_value_time(measurement) \
63 + " ORDER BY time ASC;"
64 query_result = self._read_client.query(query)
65 if len(query_result.keys()) == 0:
67 return query_result.get_points(measurement)
69 def _rw_measurment(self, measurement, columns):
70 query_result = self._query_measurement(measurement)
71 if query_result == None:
74 points_to_write = list()
75 for entry in query_result:
77 "measurement": measurement,
79 "type": entry['type'],
82 "time": entry['time'],
86 for column in columns:
88 point["fields"][column] = float(entry[column])
90 point["fields"][column] = entry[column]
92 points_to_write.append(point)
93 self._set_last_value_time(measurement, entry['time'])
95 # Write the points to yardstick database
96 if self._write_client.write_points(points_to_write):
97 LOG.debug("%d new points written to '%s' measurement",
98 len(points_to_write), measurement)
101 self._rw_measurment("cpu_value", ["instance", "type_instance", "value"])
102 self._rw_measurment("cpufreq_value", ["type_instance", "value"])
103 self._rw_measurment("downstream_rx", ["value"])
104 self._rw_measurment("downstream_tx", ["value"])
105 self._rw_measurment("downstream_value", ["value"])
106 self._rw_measurment("ds_per_cm_value", ["instance", "value"])
107 self._rw_measurment("intel_rdt_value", ["instance", "type_instance", "value"])
108 self._rw_measurment("turbostat_value", ["instance", "type_instance", "value"])
109 self._rw_measurment("upstream_rx", ["value"])
110 self._rw_measurment("upstream_tx", ["value"])
111 self._rw_measurment("upstream_value", ["value"])
114 class VcmtsdSetupEnvHelper(SetupEnvHelper):
116 BASE_PARAMETERS = "export LD_LIBRARY_PATH=/opt/collectd/lib:;"\
117 + "export CMK_PROC_FS=/host/proc;"
119 def build_us_parameters(self, pod_cfg):
120 return self.BASE_PARAMETERS + " " \
121 + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \
122 + " --socket-id=" + pod_cfg['cpu_socket_id'] \
124 + " /vcmts-config/run_upstream.sh " + pod_cfg['sg_id'] \
125 + " " + pod_cfg['ds_core_type'] \
126 + " " + pod_cfg['num_ofdm'] + "ofdm" \
127 + " " + pod_cfg['num_subs'] + "cm" \
128 + " " + pod_cfg['cm_crypto'] \
129 + " " + pod_cfg['qat'] \
130 + " " + pod_cfg['net_us'] \
131 + " " + pod_cfg['power_mgmt']
133 def build_ds_parameters(self, pod_cfg):
134 return self.BASE_PARAMETERS + " " \
135 + " /opt/bin/cmk isolate --conf-dir=/etc/cmk" \
136 + " --socket-id=" + pod_cfg['cpu_socket_id'] \
137 + " --pool=" + pod_cfg['ds_core_type'] \
138 + " /vcmts-config/run_downstream.sh " + pod_cfg['sg_id'] \
139 + " " + pod_cfg['ds_core_type'] \
140 + " " + pod_cfg['ds_core_pool_index'] \
141 + " " + pod_cfg['num_ofdm'] + "ofdm" \
142 + " " + pod_cfg['num_subs'] + "cm" \
143 + " " + pod_cfg['cm_crypto'] \
144 + " " + pod_cfg['qat'] \
145 + " " + pod_cfg['net_ds'] \
146 + " " + pod_cfg['power_mgmt']
148 def build_cmd(self, stream_dir, pod_cfg):
149 if stream_dir == 'ds':
150 return self.build_ds_parameters(pod_cfg)
152 return self.build_us_parameters(pod_cfg)
154 def run_vcmtsd(self, stream_dir, pod_cfg):
155 cmd = self.build_cmd(stream_dir, pod_cfg)
156 LOG.debug("Executing %s", cmd)
157 self.ssh_helper.send_command(cmd)
159 def setup_vnf_environment(self):
163 class VcmtsVNF(GenericVNF):
167 def __init__(self, name, vnfd):
168 super(VcmtsVNF, self).__init__(name, vnfd)
170 self.bin_path = get_nsb_option('bin_path', '')
171 self.scenario_helper = ScenarioHelper(self.name)
172 self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path)
174 self.setup_helper = VcmtsdSetupEnvHelper(self.vnfd_helper,
176 self.scenario_helper)
178 def extract_pod_cfg(self, vcmts_pods_cfg, sg_id):
179 for pod_cfg in vcmts_pods_cfg:
180 if pod_cfg['sg_id'] == sg_id:
183 def instantiate(self, scenario_cfg, context_cfg):
184 self._update_collectd_options(scenario_cfg, context_cfg)
185 self.scenario_helper.scenario_cfg = scenario_cfg
186 self.context_cfg = context_cfg
188 options = scenario_cfg.get('options', {})
191 self.vcmts_influxdb_ip = options['vcmts_influxdb_ip']
192 self.vcmts_influxdb_port = options['vcmts_influxdb_port']
194 raise KeyError("Missing destination InfluxDB details in scenario" \
195 " section of the task definition file")
198 vcmtsd_values_filepath = options['vcmtsd_values']
200 raise KeyError("Missing vcmtsd_values key in scenario options" \
201 "section of the task definition file")
203 if not os.path.isfile(vcmtsd_values_filepath):
204 raise RuntimeError("The vcmtsd_values file path provided " \
207 # The yaml_loader.py (SafeLoader) underlying regex has an issue
208 # with reading PCI addresses (processed as double). so the
209 # BaseLoader is used here.
210 with open(vcmtsd_values_filepath) as stream:
211 vcmtsd_values = yaml.load(stream, Loader=yaml.BaseLoader)
213 if vcmtsd_values == None:
214 raise RuntimeError("Error reading vcmtsd_values file provided (" +
215 vcmtsd_values_filepath + ")")
217 vnf_options = options.get(self.name, {})
218 sg_id = str(vnf_options['sg_id'])
219 stream_dir = vnf_options['stream_dir']
222 vcmts_pods_cfg = vcmtsd_values['topology']['vcmts_pods']
224 raise KeyError("Missing vcmts_pods key in the " \
225 "vcmtsd_values file provided")
227 pod_cfg = self.extract_pod_cfg(vcmts_pods_cfg, sg_id)
229 raise exceptions.IncorrectConfig(error_msg="Service group " + sg_id + " not found")
231 self.setup_helper.run_vcmtsd(stream_dir, pod_cfg)
233 def _update_collectd_options(self, scenario_cfg, context_cfg):
234 scenario_options = scenario_cfg.get('options', {})
235 generic_options = scenario_options.get('collectd', {})
236 scenario_node_options = scenario_options.get(self.name, {})\
238 context_node_options = context_cfg.get('nodes', {})\
239 .get(self.name, {}).get('collectd', {})
241 options = generic_options
242 self._update_options(options, scenario_node_options)
243 self._update_options(options, context_node_options)
245 self.setup_helper.collectd_options = options
247 def _update_options(self, options, additional_options):
248 for k, v in additional_options.items():
249 if isinstance(v, dict) and k in options:
254 def wait_for_instantiate(self):
260 def scale(self, flavor=""):
263 def collect_kpi(self):
264 self.influxdb_helper.copy_kpi()
265 return {"n/a": "n/a"}
267 def start_collect(self):
268 self.influxdb_helper = InfluxDBHelper(self.vcmts_influxdb_ip,
269 self.vcmts_influxdb_port)
270 self.influxdb_helper.start()
272 def stop_collect(self):