Merge "Improve VMs spawning for OpenStack"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / base.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import abc
16
17 import logging
18 import six
19
20 from yardstick.common import messaging
21 from yardstick.common.messaging import consumer
22 from yardstick.common.messaging import payloads
23 from yardstick.common.messaging import producer
24 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
25
26
27 LOG = logging.getLogger(__name__)
28
29
30 class QueueFileWrapper(object):
31     """ Class providing file-like API for talking with SSH connection """
32
33     def __init__(self, q_in, q_out, prompt):
34         self.q_in = q_in
35         self.q_out = q_out
36         self.closed = False
37         self.buf = []
38         self.bufsize = 20
39         self.prompt = prompt
40
41     def read(self, size):
42         """ read chunk from input queue """
43         if self.q_in.qsize() > 0 and size:
44             in_data = self.q_in.get()
45             return in_data
46
47     def write(self, chunk):
48         """ write chunk to output queue """
49         self.buf.append(chunk)
50         # flush on prompt or if we exceed bufsize
51
52         size = sum(len(c) for c in self.buf)
53         if self.prompt in chunk or size > self.bufsize:
54             out = ''.join(self.buf)
55             self.buf = []
56             self.q_out.put(out)
57
58     def close(self):
59         """ close multiprocessing queue """
60         pass
61
62     def clear(self):
63         """ clear queue """
64         while self.q_out.qsize() > 0:
65             self.q_out.get()
66
67
68 class VnfdHelper(dict):
69
70     def __init__(self, *args, **kwargs):
71         super(VnfdHelper, self).__init__(*args, **kwargs)
72         self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
73         # port num is not present until binding so we have to memoize
74         self._port_num_map = {}
75
76     @property
77     def mgmt_interface(self):
78         return self["mgmt-interface"]
79
80     @property
81     def vdu(self):
82         return self['vdu']
83
84     @property
85     def vdu0(self):
86         return self.vdu[0]
87
88     @property
89     def interfaces(self):
90         return self.vdu0['external-interface']
91
92     @property
93     def kpi(self):
94         return self['benchmark']['kpi']
95
96     def find_virtual_interface(self, **kwargs):
97         key, value = next(iter(kwargs.items()))
98         for interface in self.interfaces:
99             virtual_intf = interface["virtual-interface"]
100             if virtual_intf[key] == value:
101                 return interface
102         raise KeyError()
103
104     def find_interface(self, **kwargs):
105         key, value = next(iter(kwargs.items()))
106         for interface in self.interfaces:
107             if interface[key] == value:
108                 return interface
109         raise KeyError()
110
111     # hide dpdk_port_num key so we can abstract
112     def find_interface_by_port(self, port):
113         for interface in self.interfaces:
114             virtual_intf = interface["virtual-interface"]
115             # we have to convert to int to compare
116             if int(virtual_intf['dpdk_port_num']) == port:
117                 return interface
118         raise KeyError()
119
120     def port_num(self, port):
121         # we need interface name -> DPDK port num (PMD ID) -> LINK ID
122         # LINK ID -> PMD ID is governed by the port mask
123         """
124
125         :rtype: int
126         :type port: str
127         """
128         if isinstance(port, dict):
129             intf = port
130         else:
131             intf = self.find_interface(name=port)
132         return self._port_num_map.setdefault(intf["name"],
133                                              int(intf["virtual-interface"]["dpdk_port_num"]))
134
135     def port_nums(self, intfs):
136         return [self.port_num(i) for i in intfs]
137
138     def ports_iter(self):
139         for port_name in self.port_pairs.all_ports:
140             port_num = self.port_num(port_name)
141             yield port_name, port_num
142
143
144 class TrafficGeneratorProducer(producer.MessagingProducer):
145     """Class implementing the message producer for traffic generators
146
147     This message producer must be instantiated in the process created
148     "run_traffic" process.
149     """
150     def __init__(self, _id):
151         super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
152                                                        _id=_id)
153
154     def tg_method_started(self, version=1):
155         """Send a message to inform the traffic generation has started"""
156         self.send_message(
157             messaging.TG_METHOD_STARTED,
158             payloads.TrafficGeneratorPayload(version=version, iteration=0,
159                                              kpi={}))
160
161     def tg_method_finished(self, version=1):
162         """Send a message to inform the traffic generation has finished"""
163         self.send_message(
164             messaging.TG_METHOD_FINISHED,
165             payloads.TrafficGeneratorPayload(version=version, iteration=0,
166                                              kpi={}))
167
168     def tg_method_iteration(self, iteration, version=1, kpi=None):
169         """Send a message, with KPI, once an iteration has finished"""
170         kpi = {} if kpi is None else kpi
171         self.send_message(
172             messaging.TG_METHOD_ITERATION,
173             payloads.TrafficGeneratorPayload(version=version,
174                                              iteration=iteration, kpi=kpi))
175
176
177 @six.add_metaclass(abc.ABCMeta)
178 class GenericVNFEndpoint(consumer.NotificationHandler):
179     """Endpoint class for ``GenericVNFConsumer``"""
180
181     @abc.abstractmethod
182     def runner_method_start_iteration(self, ctxt, **kwargs):
183         """Endpoint when RUNNER_METHOD_START_ITERATION is received
184
185         :param ctxt: (dict) {'id': <Producer ID>}
186         :param kwargs: (dict) ``payloads.RunnerPayload`` context
187         """
188
189     @abc.abstractmethod
190     def runner_method_stop_iteration(self, ctxt, **kwargs):
191         """Endpoint when RUNNER_METHOD_STOP_ITERATION is received
192
193         :param ctxt: (dict) {'id': <Producer ID>}
194         :param kwargs: (dict) ``payloads.RunnerPayload`` context
195         """
196
197
198 class GenericVNFConsumer(consumer.MessagingConsumer):
199     """MQ consumer for ``GenericVNF`` derived classes"""
200
201     def __init__(self, ctx_ids, endpoints):
202         if not isinstance(endpoints, list):
203             endpoints = [endpoints]
204         super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER,
205                                                  ctx_ids, endpoints)
206
207
208 @six.add_metaclass(abc.ABCMeta)
209 class GenericVNF(object):
210     """Class providing file-like API for generic VNF implementation
211
212     Currently the only class implementing this interface is
213     yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
214     """
215
216     # centralize network naming convention
217     UPLINK = PortPairs.UPLINK
218     DOWNLINK = PortPairs.DOWNLINK
219
220     def __init__(self, name, vnfd, task_id):
221         self.name = name
222         self._task_id = task_id
223         self.vnfd_helper = VnfdHelper(vnfd)
224         # List of statistics we can obtain from this VNF
225         # - ETSI MANO 6.3.1.1 monitoring_parameter
226         self.kpi = self.vnfd_helper.kpi
227         # Standard dictionary containing params like thread no, buffer size etc
228         self.config = {}
229         self.runs_traffic = False
230
231     @abc.abstractmethod
232     def instantiate(self, scenario_cfg, context_cfg):
233         """Prepare VNF for operation and start the VNF process/VM
234
235         :param scenario_cfg: Scenario config
236         :param context_cfg: Context config
237         :return: True/False
238         """
239
240     @abc.abstractmethod
241     def wait_for_instantiate(self):
242         """Wait for VNF to start
243
244         :return: True/False
245         """
246
247     @abc.abstractmethod
248     def terminate(self):
249         """Kill all VNF processes"""
250
251     @abc.abstractmethod
252     def scale(self, flavor=""):
253         """rest
254
255         :param flavor: Name of the flavor.
256         :return:
257         """
258
259     @abc.abstractmethod
260     def collect_kpi(self):
261         """Return a dict containing the selected KPI at a given point of time
262
263         :return: {"kpi": value, "kpi2": value}
264         """
265
266     @abc.abstractmethod
267     def start_collect(self):
268         """Start KPI collection
269         :return: None
270         """
271
272     @abc.abstractmethod
273     def stop_collect(self):
274         """Stop KPI collection
275         :return: None
276         """
277
278
279 @six.add_metaclass(abc.ABCMeta)
280 class GenericTrafficGen(GenericVNF):
281     """Class providing file-like API for generic traffic generator"""
282
283     def __init__(self, name, vnfd, task_id):
284         super(GenericTrafficGen, self).__init__(name, vnfd, task_id)
285         self.runs_traffic = True
286         self.traffic_finished = False
287         self._mq_producer = None
288
289     @abc.abstractmethod
290     def run_traffic(self, traffic_profile):
291         """Generate traffic on the wire according to the given params.
292
293         This method is non-blocking, returns immediately when traffic process
294         is running. Mandatory.
295
296         :param traffic_profile:
297         :return: True/False
298         """
299
300     @abc.abstractmethod
301     def terminate(self):
302         """After this method finishes, all traffic processes should stop.
303
304         Mandatory.
305
306         :return: True/False
307         """
308
309     def listen_traffic(self, traffic_profile):
310         """Listen to traffic with the given parameters.
311
312         Method is non-blocking, returns immediately when traffic process
313         is running. Optional.
314
315         :param traffic_profile:
316         :return: True/False
317         """
318         pass
319
320     def verify_traffic(self, traffic_profile):
321         """Verify captured traffic after it has ended.
322
323         Optional.
324
325         :param traffic_profile:
326         :return: dict
327         """
328         pass
329
330     def wait_for_instantiate(self):
331         """Wait for an instance to load.
332
333         Optional.
334
335         :return: True/False
336         """
337         pass
338
339     def start_collect(self):
340         """Start KPI collection.
341
342         Traffic measurements are always collected during injection.
343
344         Optional.
345
346         :return: True/False
347         """
348         pass
349
350     def stop_collect(self):
351         """Stop KPI collection.
352
353         Optional.
354
355         :return: True/False
356         """
357         pass
358
359     @staticmethod
360     def _setup_mq_producer(id):
361         """Setup the TG MQ producer to send messages between processes
362
363         :return: (derived class from ``MessagingProducer``) MQ producer object
364         """
365         return TrafficGeneratorProducer(id)
366
367     def get_mq_producer_id(self):
368         """Return the MQ producer ID if initialized"""
369         if self._mq_producer:
370             return self._mq_producer.id