1 # Copyright (c) 2016-2017 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from yardstick.common import messaging
21 from yardstick.common.messaging import consumer
22 from yardstick.common.messaging import payloads
23 from yardstick.common.messaging import producer
24 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
27 LOG = logging.getLogger(__name__)
30 class QueueFileWrapper(object):
31 """ Class providing file-like API for talking with SSH connection """
33 def __init__(self, q_in, q_out, prompt):
42 """ read chunk from input queue """
43 if self.q_in.qsize() > 0 and size:
44 in_data = self.q_in.get()
47 def write(self, chunk):
48 """ write chunk to output queue """
49 self.buf.append(chunk)
50 # flush on prompt or if we exceed bufsize
52 size = sum(len(c) for c in self.buf)
53 if self.prompt in chunk or size > self.bufsize:
54 out = ''.join(self.buf)
59 """ close multiprocessing queue """
64 while self.q_out.qsize() > 0:
68 class VnfdHelper(dict):
70 def __init__(self, *args, **kwargs):
71 super(VnfdHelper, self).__init__(*args, **kwargs)
72 self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
73 # port num is not present until binding so we have to memoize
74 self._port_num_map = {}
77 def mgmt_interface(self):
78 return self["mgmt-interface"]
90 return self.vdu0['external-interface']
94 return self['benchmark']['kpi']
96 def find_virtual_interface(self, **kwargs):
97 key, value = next(iter(kwargs.items()))
98 for interface in self.interfaces:
99 virtual_intf = interface["virtual-interface"]
100 if virtual_intf[key] == value:
104 def find_interface(self, **kwargs):
105 key, value = next(iter(kwargs.items()))
106 for interface in self.interfaces:
107 if interface[key] == value:
111 # hide dpdk_port_num key so we can abstract
112 def find_interface_by_port(self, port):
113 for interface in self.interfaces:
114 virtual_intf = interface["virtual-interface"]
115 # we have to convert to int to compare
116 if int(virtual_intf['dpdk_port_num']) == port:
120 def port_num(self, port):
121 # we need interface name -> DPDK port num (PMD ID) -> LINK ID
122 # LINK ID -> PMD ID is governed by the port mask
128 if isinstance(port, dict):
131 intf = self.find_interface(name=port)
132 return self._port_num_map.setdefault(intf["name"],
133 int(intf["virtual-interface"]["dpdk_port_num"]))
135 def port_nums(self, intfs):
136 return [self.port_num(i) for i in intfs]
138 def ports_iter(self):
139 for port_name in self.port_pairs.all_ports:
140 port_num = self.port_num(port_name)
141 yield port_name, port_num
144 class TrafficGeneratorProducer(producer.MessagingProducer):
145 """Class implementing the message producer for traffic generators
147 This message producer must be instantiated in the process created
148 "run_traffic" process.
150 def __init__(self, _id):
151 super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
154 def tg_method_started(self, version=1):
155 """Send a message to inform the traffic generation has started"""
157 messaging.TG_METHOD_STARTED,
158 payloads.TrafficGeneratorPayload(version=version, iteration=0,
161 def tg_method_finished(self, version=1):
162 """Send a message to inform the traffic generation has finished"""
164 messaging.TG_METHOD_FINISHED,
165 payloads.TrafficGeneratorPayload(version=version, iteration=0,
168 def tg_method_iteration(self, iteration, version=1, kpi=None):
169 """Send a message, with KPI, once an iteration has finished"""
170 kpi = {} if kpi is None else kpi
172 messaging.TG_METHOD_ITERATION,
173 payloads.TrafficGeneratorPayload(version=version,
174 iteration=iteration, kpi=kpi))
177 @six.add_metaclass(abc.ABCMeta)
178 class GenericVNFEndpoint(consumer.NotificationHandler):
179 """Endpoint class for ``GenericVNFConsumer``"""
182 def runner_method_start_iteration(self, ctxt, **kwargs):
183 """Endpoint when RUNNER_METHOD_START_ITERATION is received
185 :param ctxt: (dict) {'id': <Producer ID>}
186 :param kwargs: (dict) ``payloads.RunnerPayload`` context
190 def runner_method_stop_iteration(self, ctxt, **kwargs):
191 """Endpoint when RUNNER_METHOD_STOP_ITERATION is received
193 :param ctxt: (dict) {'id': <Producer ID>}
194 :param kwargs: (dict) ``payloads.RunnerPayload`` context
198 class GenericVNFConsumer(consumer.MessagingConsumer):
199 """MQ consumer for ``GenericVNF`` derived classes"""
201 def __init__(self, ctx_ids, endpoints):
202 if not isinstance(endpoints, list):
203 endpoints = [endpoints]
204 super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER,
208 @six.add_metaclass(abc.ABCMeta)
209 class GenericVNF(object):
210 """Class providing file-like API for generic VNF implementation
212 Currently the only class implementing this interface is
213 yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
216 # centralize network naming convention
217 UPLINK = PortPairs.UPLINK
218 DOWNLINK = PortPairs.DOWNLINK
220 def __init__(self, name, vnfd, task_id):
222 self._task_id = task_id
223 self.vnfd_helper = VnfdHelper(vnfd)
224 # List of statistics we can obtain from this VNF
225 # - ETSI MANO 6.3.1.1 monitoring_parameter
226 self.kpi = self.vnfd_helper.kpi
227 # Standard dictionary containing params like thread no, buffer size etc
229 self.runs_traffic = False
232 def instantiate(self, scenario_cfg, context_cfg):
233 """Prepare VNF for operation and start the VNF process/VM
235 :param scenario_cfg: Scenario config
236 :param context_cfg: Context config
241 def wait_for_instantiate(self):
242 """Wait for VNF to start
249 """Kill all VNF processes"""
252 def scale(self, flavor=""):
255 :param flavor: Name of the flavor.
260 def collect_kpi(self):
261 """Return a dict containing the selected KPI at a given point of time
263 :return: {"kpi": value, "kpi2": value}
267 def start_collect(self):
268 """Start KPI collection
273 def stop_collect(self):
274 """Stop KPI collection
279 @six.add_metaclass(abc.ABCMeta)
280 class GenericTrafficGen(GenericVNF):
281 """Class providing file-like API for generic traffic generator"""
283 def __init__(self, name, vnfd, task_id):
284 super(GenericTrafficGen, self).__init__(name, vnfd, task_id)
285 self.runs_traffic = True
286 self.traffic_finished = False
287 self._mq_producer = None
290 def run_traffic(self, traffic_profile):
291 """Generate traffic on the wire according to the given params.
293 This method is non-blocking, returns immediately when traffic process
294 is running. Mandatory.
296 :param traffic_profile:
302 """After this method finishes, all traffic processes should stop.
309 def listen_traffic(self, traffic_profile):
310 """Listen to traffic with the given parameters.
312 Method is non-blocking, returns immediately when traffic process
313 is running. Optional.
315 :param traffic_profile:
320 def verify_traffic(self, traffic_profile):
321 """Verify captured traffic after it has ended.
325 :param traffic_profile:
330 def wait_for_instantiate(self):
331 """Wait for an instance to load.
339 def start_collect(self):
340 """Start KPI collection.
342 Traffic measurements are always collected during injection.
350 def stop_collect(self):
351 """Stop KPI collection.
360 def _setup_mq_producer(id):
361 """Setup the TG MQ producer to send messages between processes
363 :return: (derived class from ``MessagingProducer``) MQ producer object
365 return TrafficGeneratorProducer(id)
367 def get_mq_producer_id(self):
368 """Return the MQ producer ID if initialized"""
369 if self._mq_producer:
370 return self._mq_producer.id