Merge "Add CPU pinning support for node context"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / tg_trex.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Trex acts as traffic generation and vnf definitions based on IETS Spec """
15
16 from __future__ import absolute_import
17 from __future__ import print_function
18 import multiprocessing
19 import time
20 import logging
21 import os
22 import yaml
23
24 from yardstick import ssh
25 from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
26 from yardstick.network_services.utils import get_nsb_option
27 from yardstick.network_services.utils import provision_tool
28 from stl.trex_stl_lib.trex_stl_client import STLClient
29 from stl.trex_stl_lib.trex_stl_client import LoggerApi
30 from stl.trex_stl_lib.trex_stl_exceptions import STLError
31
32 LOG = logging.getLogger(__name__)
33 DURATION = 30
34 WAIT_QUEUE = 1
35 TREX_SYNC_PORT = 4500
36 TREX_ASYNC_PORT = 4501
37
38
39 class TrexTrafficGen(GenericTrafficGen):
40     """
41     This class handles mapping traffic profile and generating
42     traffic for given testcase
43     """
44
45     def __init__(self, vnfd):
46         super(TrexTrafficGen, self).__init__(vnfd)
47         self._result = {}
48         self._queue = multiprocessing.Queue()
49         self._terminated = multiprocessing.Value('i', 0)
50         self._traffic_process = None
51         self._vpci_ascending = None
52         self.client = None
53         self.my_ports = None
54         self.client_started = multiprocessing.Value('i', 0)
55
56         mgmt_interface = self.vnfd["mgmt-interface"]
57         ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
58         self.connection = ssh.SSH(mgmt_interface["user"],
59                                   mgmt_interface["ip"],
60                                   password=mgmt_interface["password"],
61                                   port=ssh_port)
62         self.connection.wait()
63
64     @classmethod
65     def _split_mac_address_into_list(cls, mac):
66         octets = mac.split(':')
67         for i, elem in enumerate(octets):
68             octets[i] = "0x" + str(elem)
69         return octets
70
71     def _generate_trex_cfg(self, vnfd):
72         """
73
74         :param vnfd: vnfd.yaml
75         :return: trex_cfg.yaml file
76         """
77         trex_cfg = dict(
78             port_limit=0,
79             version='2',
80             interfaces=[],
81             port_info=list(dict(
82             ))
83         )
84         trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
85         trex_cfg["version"] = '2'
86
87         cfg_file = []
88         vpci = []
89         port = {}
90
91         for interface in range(len(vnfd["vdu"][0]["external-interface"])):
92             ext_intrf = vnfd["vdu"][0]["external-interface"]
93             virtual_interface = ext_intrf[interface]["virtual-interface"]
94             vpci.append(virtual_interface["vpci"])
95
96             port["src_mac"] = self._split_mac_address_into_list(
97                 virtual_interface["local_mac"])
98             port["dest_mac"] = self._split_mac_address_into_list(
99                 virtual_interface["dst_mac"])
100
101             trex_cfg["port_info"].append(port.copy())
102
103         trex_cfg["interfaces"] = vpci
104         cfg_file.append(trex_cfg)
105
106         with open('/tmp/trex_cfg.yaml', 'w') as outfile:
107             outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
108         self.connection.put('/tmp/trex_cfg.yaml', '/etc')
109
110         self._vpci_ascending = sorted(vpci)
111
112     @classmethod
113     def __setup_hugepages(cls, connection):
114         hugepages = \
115             connection.execute(
116                 "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
117         hugepages = hugepages.rstrip()
118
119         memory_path = \
120             '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
121         connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
122
123         pages = 16384 if hugepages.rstrip() == "2048kB" else 16
124         connection.execute("echo %s > %s" % (pages, memory_path))
125
126     def setup_vnf_environment(self, connection):
127         ''' setup dpdk environment needed for vnf to run '''
128
129         self.__setup_hugepages(connection)
130         connection.execute("modprobe uio && modprobe igb_uio")
131
132         exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
133         if exit_status == 0:
134             return
135
136         dpdk = os.path.join(self.bin_path, "dpdk-16.07")
137         dpdk_setup = \
138             provision_tool(self.connection,
139                            os.path.join(self.bin_path, "nsb_setup.sh"))
140         status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
141         if status:
142             connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
143
144     def scale(self, flavor=""):
145         ''' scale vnfbased on flavor input '''
146         super(TrexTrafficGen, self).scale(flavor)
147
148     def instantiate(self, scenario_cfg, context_cfg):
149         self._generate_trex_cfg(self.vnfd)
150         self.setup_vnf_environment(self.connection)
151
152         trex = os.path.join(self.bin_path, "trex")
153         err = \
154             self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
155         if err != 0:
156             LOG.info("Copying trex to destination...")
157             self.connection.put("/root/.bash_profile", "/root/.bash_profile")
158             self.connection.put(trex, trex, True)
159             ko_src = os.path.join(trex, "scripts/ko/src/")
160             self.connection.execute("cd %s && make && make install" % ko_src)
161
162         LOG.info("Starting TRex server...")
163         _tg_process = \
164             multiprocessing.Process(target=self._start_server)
165         _tg_process.start()
166         while True:
167             if not _tg_process.is_alive():
168                 raise RuntimeError("Traffic Generator process died.")
169             LOG.info("Waiting for TG Server to start.. ")
170             time.sleep(1)
171             status = \
172                 self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
173             if status == 0:
174                 LOG.info("TG server is up and running.")
175                 return _tg_process.exitcode
176
177     def listen_traffic(self, traffic_profile):
178         pass
179
180     def _get_logical_if_name(self, vpci):
181         ext_intf = self.vnfd["vdu"][0]["external-interface"]
182         for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
183             virtual_intf = ext_intf[interface]["virtual-interface"]
184             if virtual_intf["vpci"] == vpci:
185                 return ext_intf[interface]["name"]
186
187     def run_traffic(self, traffic_profile):
188         self._traffic_process = \
189             multiprocessing.Process(target=self._traffic_runner,
190                                     args=(traffic_profile, self._queue,
191                                           self.client_started,
192                                           self._terminated))
193         self._traffic_process.start()
194         # Wait for traffic process to start
195         while self.client_started.value == 0:
196             time.sleep(1)
197
198         return self._traffic_process.is_alive()
199
200     def _start_server(self):
201         mgmt_interface = self.vnfd["mgmt-interface"]
202         ssh_port = mgmt_interface.get("ssh_port", ssh.DEFAULT_PORT)
203         _server = ssh.SSH(mgmt_interface["user"], mgmt_interface["ip"],
204                           password=mgmt_interface["password"],
205                           port=ssh_port)
206         _server.wait()
207
208         _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
209                         (TREX_SYNC_PORT, TREX_ASYNC_PORT))
210
211         trex_path = os.path.join(self.bin_path, "trex/scripts")
212         path = get_nsb_option("trex_path", trex_path)
213         trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
214
215         _server.execute(trex_cmd)
216
217     def _connect_client(self, client=None):
218         if client is None:
219             client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
220                                server=self.vnfd["mgmt-interface"]["ip"],
221                                verbose_level=LoggerApi.VERBOSE_QUIET)
222         # try to connect with 5s intervals, 30s max
223         for idx in range(6):
224             try:
225                 client.connect()
226                 break
227             except STLError:
228                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
229                 time.sleep(5)
230         return client
231
232     def _traffic_runner(self, traffic_profile, queue,
233                         client_started, terminated):
234         LOG.info("Starting TRex client...")
235
236         self.my_ports = [0, 1]
237         self.client = self._connect_client()
238         self.client.reset(ports=self.my_ports)
239
240         self.client.remove_all_streams(self.my_ports)  # remove all streams
241
242         while not terminated.value:
243             traffic_profile.execute(self)
244             client_started.value = 1
245             last_res = self.client.get_stats(self.my_ports)
246             if not isinstance(last_res, dict):  # added for mock unit test
247                 terminated.value = 1
248                 last_res = {}
249
250             samples = {}
251             for vpci_idx in range(len(self._vpci_ascending)):
252                 name = \
253                     self._get_logical_if_name(self._vpci_ascending[vpci_idx])
254                 # fixme: VNFDs KPIs values needs to be mapped to TRex structure
255                 xe_value = last_res.get(vpci_idx, {})
256                 samples[name] = \
257                     {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
258                      "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
259                      "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
260                      "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
261                      "in_packets": xe_value.get("ipackets", 0),
262                      "out_packets": xe_value.get("opackets", 0)}
263             time.sleep(WAIT_QUEUE)
264             queue.put(samples)
265
266         self.client.disconnect()
267         terminated.value = 0
268
269     def collect_kpi(self):
270         if not self._queue.empty():
271             self._result.update(self._queue.get())
272         LOG.debug("trex collect Kpis %s", self._result)
273         return self._result
274
275     def terminate(self):
276         self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
277                                 (TREX_SYNC_PORT, TREX_ASYNC_PORT))
278         self.traffic_finished = True
279         if self._traffic_process:
280             self._traffic_process.terminate()