1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
21 from yardstick.common import messaging
22 from yardstick.common.messaging import payloads
23 from yardstick.common.messaging import producer
24 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
27 LOG = logging.getLogger(__name__)
30 class QueueFileWrapper(object):
31 """ Class providing file-like API for talking with SSH connection """
33 def __init__(self, q_in, q_out, prompt):
42 """ read chunk from input queue """
43 if self.q_in.qsize() > 0 and size:
44 in_data = self.q_in.get()
47 def write(self, chunk):
48 """ write chunk to output queue """
49 self.buf.append(chunk)
50 # flush on prompt or if we exceed bufsize
52 size = sum(len(c) for c in self.buf)
53 if self.prompt in chunk or size > self.bufsize:
54 out = ''.join(self.buf)
59 """ close multiprocessing queue """
64 while self.q_out.qsize() > 0:
68 class VnfdHelper(dict):
70 def __init__(self, *args, **kwargs):
71 super(VnfdHelper, self).__init__(*args, **kwargs)
72 self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
73 # port num is not present until binding so we have to memoize
74 self._port_num_map = {}
77 def mgmt_interface(self):
78 return self["mgmt-interface"]
90 return self.vdu0['external-interface']
94 return self['benchmark']['kpi']
96 def find_virtual_interface(self, **kwargs):
97 key, value = next(iter(kwargs.items()))
98 for interface in self.interfaces:
99 virtual_intf = interface["virtual-interface"]
100 if virtual_intf[key] == value:
104 def find_interface(self, **kwargs):
105 key, value = next(iter(kwargs.items()))
106 for interface in self.interfaces:
107 if interface[key] == value:
111 # hide dpdk_port_num key so we can abstract
112 def find_interface_by_port(self, port):
113 for interface in self.interfaces:
114 virtual_intf = interface["virtual-interface"]
115 # we have to convert to int to compare
116 if int(virtual_intf['dpdk_port_num']) == port:
120 def port_num(self, port):
121 # we need interface name -> DPDK port num (PMD ID) -> LINK ID
122 # LINK ID -> PMD ID is governed by the port mask
128 if isinstance(port, dict):
131 intf = self.find_interface(name=port)
132 return self._port_num_map.setdefault(intf["name"],
133 int(intf["virtual-interface"]["dpdk_port_num"]))
135 def port_nums(self, intfs):
136 return [self.port_num(i) for i in intfs]
138 def ports_iter(self):
139 for port_name in self.port_pairs.all_ports:
140 port_num = self.port_num(port_name)
141 yield port_name, port_num
144 class TrafficGeneratorProducer(producer.MessagingProducer):
145 """Class implementing the message producer for traffic generators
147 This message producer must be instantiated in the process created
148 "run_traffic" process.
150 def __init__(self, _id):
151 super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
154 def tg_method_started(self, version=1):
155 """Send a message to inform the traffic generation has started"""
157 messaging.TG_METHOD_STARTED,
158 payloads.TrafficGeneratorPayload(version=version, iteration=0,
161 def tg_method_finished(self, version=1):
162 """Send a message to inform the traffic generation has finished"""
164 messaging.TG_METHOD_FINISHED,
165 payloads.TrafficGeneratorPayload(version=version, iteration=0,
168 def tg_method_iteration(self, iteration, version=1, kpi=None):
169 """Send a message, with KPI, once an iteration has finished"""
170 kpi = {} if kpi is None else kpi
172 messaging.TG_METHOD_ITERATION,
173 payloads.TrafficGeneratorPayload(version=version,
174 iteration=iteration, kpi=kpi))
177 @six.add_metaclass(abc.ABCMeta)
178 class GenericVNF(object):
179 """Class providing file-like API for generic VNF implementation
181 Currently the only class implementing this interface is
182 yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
185 # centralize network naming convention
186 UPLINK = PortPairs.UPLINK
187 DOWNLINK = PortPairs.DOWNLINK
189 def __init__(self, name, vnfd):
191 self.vnfd_helper = VnfdHelper(vnfd)
192 # List of statistics we can obtain from this VNF
193 # - ETSI MANO 6.3.1.1 monitoring_parameter
194 self.kpi = self.vnfd_helper.kpi
195 # Standard dictionary containing params like thread no, buffer size etc
197 self.runs_traffic = False
200 def instantiate(self, scenario_cfg, context_cfg):
201 """Prepare VNF for operation and start the VNF process/VM
203 :param scenario_cfg: Scenario config
204 :param context_cfg: Context config
209 def wait_for_instantiate(self):
210 """Wait for VNF to start
217 """Kill all VNF processes"""
220 def scale(self, flavor=""):
223 :param flavor: Name of the flavor.
228 def collect_kpi(self):
229 """Return a dict containing the selected KPI at a given point of time
231 :return: {"kpi": value, "kpi2": value}
235 def start_collect(self):
236 """Start KPI collection
241 def stop_collect(self):
242 """Stop KPI collection
247 @six.add_metaclass(abc.ABCMeta)
248 class GenericTrafficGen(GenericVNF):
249 """ Class providing file-like API for generic traffic generator """
251 def __init__(self, name, vnfd):
252 super(GenericTrafficGen, self).__init__(name, vnfd)
253 self.runs_traffic = True
254 self.traffic_finished = False
255 self._mq_producer = None
258 def run_traffic(self, traffic_profile):
259 """Generate traffic on the wire according to the given params.
261 This method is non-blocking, returns immediately when traffic process
262 is running. Mandatory.
264 :param traffic_profile:
270 """After this method finishes, all traffic processes should stop.
277 def listen_traffic(self, traffic_profile):
278 """Listen to traffic with the given parameters.
280 Method is non-blocking, returns immediately when traffic process
281 is running. Optional.
283 :param traffic_profile:
288 def verify_traffic(self, traffic_profile):
289 """Verify captured traffic after it has ended.
293 :param traffic_profile:
298 def wait_for_instantiate(self):
299 """Wait for an instance to load.
307 def start_collect(self):
308 """Start KPI collection.
310 Traffic measurements are always collected during injection.
318 def stop_collect(self):
319 """Stop KPI collection.
328 def _setup_mq_producer(id):
329 """Setup the TG MQ producer to send messages between processes
331 :return: (derived class from ``MessagingProducer``) MQ producer object
333 return TrafficGeneratorProducer(id)
335 def get_mq_producer_id(self):
336 """Return the MQ producer ID if initialized"""
337 if self._mq_producer:
338 return self._mq_producer.id