Create a SampleVNF MQ consumer class
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / vpe_vnf.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ vPE (Power Edge router) VNF model definitions based on IETS Spec """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18
19
20 import os
21 import logging
22 import re
23 import posixpath
24
25 from six.moves import configparser, zip
26
27 from yardstick.common.process import check_if_process_failed
28 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
29 from yardstick.network_services.pipeline import PipelineRules
30 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
31 from yardstick.benchmark.contexts import base as ctx_base
32
33 LOG = logging.getLogger(__name__)
34
35 VPE_PIPELINE_COMMAND = "sudo {tool_path} -p {port_mask_hex} -f {cfg_file} -s {script} {hwlb}"
36
37 VPE_COLLECT_KPI = """\
38 Pkts in:\\s(\\d+)\r\n\
39 \tPkts dropped by AH:\\s(\\d+)\r\n\
40 \tPkts dropped by other:\\s(\\d+)\
41 """
42
43
44 class ConfigCreate(object):
45
46     @staticmethod
47     def vpe_tmq(config, index):
48         tm_q = 'TM{0}'.format(index)
49         config.add_section(tm_q)
50         config.set(tm_q, 'burst_read', '24')
51         config.set(tm_q, 'burst_write', '32')
52         config.set(tm_q, 'cfg', '/tmp/full_tm_profile_10G.cfg')
53         return config
54
55     def __init__(self, vnfd_helper, socket):
56         super(ConfigCreate, self).__init__()
57         self.sw_q = -1
58         self.sink_q = -1
59         self.n_pipeline = 1
60         self.vnfd_helper = vnfd_helper
61         self.uplink_ports = self.vnfd_helper.port_pairs.uplink_ports
62         self.downlink_ports = self.vnfd_helper.port_pairs.downlink_ports
63         self.pipeline_per_port = 9
64         self.socket = socket
65         self._dpdk_port_to_link_id_map = None
66
67     @property
68     def dpdk_port_to_link_id_map(self):
69         # we need interface name -> DPDK port num (PMD ID) -> LINK ID
70         # LINK ID -> PMD ID is governed by the port mask
71         # LINK instances are created implicitly based on the PORT_MASK application startup
72         # argument. LINK0 is the first port enabled in the PORT_MASK, port 1 is the next one,
73         # etc. The LINK ID is different than the DPDK PMD-level NIC port ID, which is the actual
74         #  position in the bitmask mentioned above. For example, if bit 5 is the first bit set
75         # in the bitmask, then LINK0 is having the PMD ID of 5. This mechanism creates a
76         # contiguous LINK ID space and isolates the configuration file against changes in the
77         # board PCIe slots where NICs are plugged in.
78         if self._dpdk_port_to_link_id_map is None:
79             self._dpdk_port_to_link_id_map = {}
80             for link_id, port_name in enumerate(sorted(self.vnfd_helper.port_pairs.all_ports,
81                                                        key=self.vnfd_helper.port_num)):
82                 self._dpdk_port_to_link_id_map[port_name] = link_id
83         return self._dpdk_port_to_link_id_map
84
85     def vpe_initialize(self, config):
86         config.add_section('EAL')
87         config.set('EAL', 'log_level', '0')
88
89         config.add_section('PIPELINE0')
90         config.set('PIPELINE0', 'type', 'MASTER')
91         config.set('PIPELINE0', 'core', 's%sC0' % self.socket)
92
93         config.add_section('MEMPOOL0')
94         config.set('MEMPOOL0', 'pool_size', '256K')
95
96         config.add_section('MEMPOOL1')
97         config.set('MEMPOOL1', 'pool_size', '2M')
98         return config
99
100     def vpe_rxq(self, config):
101         for port in self.downlink_ports:
102             new_section = 'RXQ{0}.0'.format(self.dpdk_port_to_link_id_map[port])
103             config.add_section(new_section)
104             config.set(new_section, 'mempool', 'MEMPOOL1')
105
106         return config
107
108     def get_sink_swq(self, parser, pipeline, k, index):
109         sink = ""
110         pktq = parser.get(pipeline, k)
111         if "SINK" in pktq:
112             self.sink_q += 1
113             sink = " SINK{0}".format(self.sink_q)
114         if "TM" in pktq:
115             sink = " TM{0}".format(index)
116         pktq = "SWQ{0}{1}".format(self.sw_q, sink)
117         return pktq
118
119     def vpe_upstream(self, vnf_cfg, index=0):  # pragma: no cover
120         # NOTE(ralonsoh): this function must be covered in UTs.
121         parser = configparser.ConfigParser()
122         parser.read(os.path.join(vnf_cfg, 'vpe_upstream'))
123
124         for pipeline in parser.sections():
125             for k, v in parser.items(pipeline):
126                 if k == "pktq_in":
127                     if "RXQ" in v:
128                         port = self.dpdk_port_to_link_id_map[self.uplink_ports[index]]
129                         value = "RXQ{0}.0".format(port)
130                     else:
131                         value = self.get_sink_swq(parser, pipeline, k, index)
132
133                     parser.set(pipeline, k, value)
134
135                 elif k == "pktq_out":
136                     if "TXQ" in v:
137                         port = self.dpdk_port_to_link_id_map[self.downlink_ports[index]]
138                         value = "TXQ{0}.0".format(port)
139                     else:
140                         self.sw_q += 1
141                         value = self.get_sink_swq(parser, pipeline, k, index)
142
143                     parser.set(pipeline, k, value)
144
145             new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
146             if new_pipeline != pipeline:
147                 parser._sections[new_pipeline] = parser._sections[pipeline]
148                 parser._sections.pop(pipeline)
149             self.n_pipeline += 1
150         return parser
151
152     def vpe_downstream(self, vnf_cfg, index):  # pragma: no cover
153         # NOTE(ralonsoh): this function must be covered in UTs.
154         parser = configparser.ConfigParser()
155         parser.read(os.path.join(vnf_cfg, 'vpe_downstream'))
156         for pipeline in parser.sections():
157             for k, v in parser.items(pipeline):
158
159                 if k == "pktq_in":
160                     port = self.dpdk_port_to_link_id_map[self.downlink_ports[index]]
161                     if "RXQ" not in v:
162                         value = self.get_sink_swq(parser, pipeline, k, index)
163                     elif "TM" in v:
164                         value = "RXQ{0}.0 TM{1}".format(port, index)
165                     else:
166                         value = "RXQ{0}.0".format(port)
167
168                     parser.set(pipeline, k, value)
169
170                 if k == "pktq_out":
171                     port = self.dpdk_port_to_link_id_map[self.uplink_ports[index]]
172                     if "TXQ" not in v:
173                         self.sw_q += 1
174                         value = self.get_sink_swq(parser, pipeline, k, index)
175                     elif "TM" in v:
176                         value = "TXQ{0}.0 TM{1}".format(port, index)
177                     else:
178                         value = "TXQ{0}.0".format(port)
179
180                     parser.set(pipeline, k, value)
181
182             new_pipeline = 'PIPELINE{0}'.format(self.n_pipeline)
183             if new_pipeline != pipeline:
184                 parser._sections[new_pipeline] = parser._sections[pipeline]
185                 parser._sections.pop(pipeline)
186             self.n_pipeline += 1
187         return parser
188
189     def create_vpe_config(self, vnf_cfg):
190         config = configparser.ConfigParser()
191         vpe_cfg = os.path.join("/tmp/vpe_config")
192         with open(vpe_cfg, 'w') as cfg_file:
193             config = self.vpe_initialize(config)
194             config = self.vpe_rxq(config)
195             config.write(cfg_file)
196             for index, _ in enumerate(self.uplink_ports):
197                 config = self.vpe_upstream(vnf_cfg, index)
198                 config.write(cfg_file)
199                 config = self.vpe_downstream(vnf_cfg, index)
200                 config = self.vpe_tmq(config, index)
201                 config.write(cfg_file)
202
203     def generate_vpe_script(self, interfaces):
204         rules = PipelineRules(pipeline_id=1)
205         for uplink_port, downlink_port in zip(self.uplink_ports, self.downlink_ports):
206
207             uplink_intf = \
208                 next(intf["virtual-interface"] for intf in interfaces
209                      if intf["name"] == uplink_port)
210             downlink_intf = \
211                 next(intf["virtual-interface"] for intf in interfaces
212                      if intf["name"] == downlink_port)
213
214             dst_port0_ip = uplink_intf["dst_ip"]
215             dst_port1_ip = downlink_intf["dst_ip"]
216             dst_port0_mac = uplink_intf["dst_mac"]
217             dst_port1_mac = downlink_intf["dst_mac"]
218
219             rules.add_firewall_script(dst_port0_ip)
220             rules.next_pipeline()
221             rules.add_flow_classification_script()
222             rules.next_pipeline()
223             rules.add_flow_action()
224             rules.next_pipeline()
225             rules.add_flow_action2()
226             rules.next_pipeline()
227             rules.add_route_script(dst_port1_ip, dst_port1_mac)
228             rules.next_pipeline()
229             rules.add_route_script2(dst_port0_ip, dst_port0_mac)
230             rules.next_pipeline(num=4)
231
232         return rules.get_string()
233
234     def generate_tm_cfg(self, vnf_cfg):
235         vnf_cfg = os.path.join(vnf_cfg, "full_tm_profile_10G.cfg")
236         if os.path.exists(vnf_cfg):
237             return open(vnf_cfg).read()
238
239
240 class VpeApproxSetupEnvHelper(DpdkVnfSetupEnvHelper):
241
242     APP_NAME = 'vPE_vnf'
243     CFG_CONFIG = "/tmp/vpe_config"
244     CFG_SCRIPT = "/tmp/vpe_script"
245     TM_CONFIG = "/tmp/full_tm_profile_10G.cfg"
246     CORES = ['0', '1', '2', '3', '4', '5']
247     PIPELINE_COMMAND = VPE_PIPELINE_COMMAND
248
249     def _build_vnf_ports(self):
250         self._port_pairs = PortPairs(self.vnfd_helper.interfaces)
251         self.uplink_ports = self._port_pairs.uplink_ports
252         self.downlink_ports = self._port_pairs.downlink_ports
253         self.all_ports = self._port_pairs.all_ports
254
255     def build_config(self):
256         vpe_vars = {
257             "bin_path": self.ssh_helper.bin_path,
258             "socket": self.socket,
259         }
260
261         self._build_vnf_ports()
262         vpe_conf = ConfigCreate(self.vnfd_helper, self.socket)
263         vpe_conf.create_vpe_config(self.scenario_helper.vnf_cfg)
264
265         config_basename = posixpath.basename(self.CFG_CONFIG)
266         script_basename = posixpath.basename(self.CFG_SCRIPT)
267         tm_basename = posixpath.basename(self.TM_CONFIG)
268         with open(self.CFG_CONFIG) as handle:
269             vpe_config = handle.read()
270
271         self.ssh_helper.upload_config_file(config_basename, vpe_config.format(**vpe_vars))
272
273         vpe_script = vpe_conf.generate_vpe_script(self.vnfd_helper.interfaces)
274         self.ssh_helper.upload_config_file(script_basename, vpe_script.format(**vpe_vars))
275
276         tm_config = vpe_conf.generate_tm_cfg(self.scenario_helper.vnf_cfg)
277         self.ssh_helper.upload_config_file(tm_basename, tm_config)
278
279         LOG.info("Provision and start the %s", self.APP_NAME)
280         LOG.info(self.CFG_CONFIG)
281         LOG.info(self.CFG_SCRIPT)
282         self._build_pipeline_kwargs()
283         return self.PIPELINE_COMMAND.format(**self.pipeline_kwargs)
284
285
286 class VpeApproxVnf(SampleVNF):
287     """ This class handles vPE VNF model-driver definitions """
288
289     APP_NAME = 'vPE_vnf'
290     APP_WORD = 'vpe'
291     COLLECT_KPI = VPE_COLLECT_KPI
292     WAIT_TIME = 20
293
294     def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
295                  resource_helper_type=None):
296         if setup_env_helper_type is None:
297             setup_env_helper_type = VpeApproxSetupEnvHelper
298         super(VpeApproxVnf, self).__init__(
299             name, vnfd, task_id, setup_env_helper_type, resource_helper_type)
300
301     def get_stats(self, *args, **kwargs):
302         raise NotImplementedError
303
304     def collect_kpi(self):
305         # we can't get KPIs if the VNF is down
306         check_if_process_failed(self._vnf_process)
307         physical_node = ctx_base.Context.get_physical_node_from_server(
308             self.scenario_helper.nodes[self.name])
309
310         result = {
311             "physical_node": physical_node,
312             'pkt_in_up_stream': 0,
313             'pkt_drop_up_stream': 0,
314             'pkt_in_down_stream': 0,
315             'pkt_drop_down_stream': 0,
316             'collect_stats': self.resource_helper.collect_kpi(),
317         }
318
319         indexes_in = [1]
320         indexes_drop = [2, 3]
321         command = 'p {0} stats port {1} 0'
322         for index, direction in ((5, 'up'), (9, 'down')):
323             key_in = "pkt_in_{0}_stream".format(direction)
324             key_drop = "pkt_drop_{0}_stream".format(direction)
325             for mode in ('in', 'out'):
326                 stats = self.vnf_execute(command.format(index, mode))
327                 match = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
328                 if not match:
329                     continue
330                 result[key_in] += sum(int(match.group(x)) for x in indexes_in)
331                 result[key_drop] += sum(int(match.group(x)) for x in indexes_drop)
332
333         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
334         return result