Merge "Add "resources" parameter in Kubernetes context"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / base.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 import abc
17
18 import logging
19 import six
20
21 from yardstick.common import messaging
22 from yardstick.common.messaging import payloads
23 from yardstick.common.messaging import producer
24 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
25
26
27 LOG = logging.getLogger(__name__)
28
29
30 class QueueFileWrapper(object):
31     """ Class providing file-like API for talking with SSH connection """
32
33     def __init__(self, q_in, q_out, prompt):
34         self.q_in = q_in
35         self.q_out = q_out
36         self.closed = False
37         self.buf = []
38         self.bufsize = 20
39         self.prompt = prompt
40
41     def read(self, size):
42         """ read chunk from input queue """
43         if self.q_in.qsize() > 0 and size:
44             in_data = self.q_in.get()
45             return in_data
46
47     def write(self, chunk):
48         """ write chunk to output queue """
49         self.buf.append(chunk)
50         # flush on prompt or if we exceed bufsize
51
52         size = sum(len(c) for c in self.buf)
53         if self.prompt in chunk or size > self.bufsize:
54             out = ''.join(self.buf)
55             self.buf = []
56             self.q_out.put(out)
57
58     def close(self):
59         """ close multiprocessing queue """
60         pass
61
62     def clear(self):
63         """ clear queue """
64         while self.q_out.qsize() > 0:
65             self.q_out.get()
66
67
68 class VnfdHelper(dict):
69
70     def __init__(self, *args, **kwargs):
71         super(VnfdHelper, self).__init__(*args, **kwargs)
72         self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
73         # port num is not present until binding so we have to memoize
74         self._port_num_map = {}
75
76     @property
77     def mgmt_interface(self):
78         return self["mgmt-interface"]
79
80     @property
81     def vdu(self):
82         return self['vdu']
83
84     @property
85     def vdu0(self):
86         return self.vdu[0]
87
88     @property
89     def interfaces(self):
90         return self.vdu0['external-interface']
91
92     @property
93     def kpi(self):
94         return self['benchmark']['kpi']
95
96     def find_virtual_interface(self, **kwargs):
97         key, value = next(iter(kwargs.items()))
98         for interface in self.interfaces:
99             virtual_intf = interface["virtual-interface"]
100             if virtual_intf[key] == value:
101                 return interface
102         raise KeyError()
103
104     def find_interface(self, **kwargs):
105         key, value = next(iter(kwargs.items()))
106         for interface in self.interfaces:
107             if interface[key] == value:
108                 return interface
109         raise KeyError()
110
111     # hide dpdk_port_num key so we can abstract
112     def find_interface_by_port(self, port):
113         for interface in self.interfaces:
114             virtual_intf = interface["virtual-interface"]
115             # we have to convert to int to compare
116             if int(virtual_intf['dpdk_port_num']) == port:
117                 return interface
118         raise KeyError()
119
120     def port_num(self, port):
121         # we need interface name -> DPDK port num (PMD ID) -> LINK ID
122         # LINK ID -> PMD ID is governed by the port mask
123         """
124
125         :rtype: int
126         :type port: str
127         """
128         if isinstance(port, dict):
129             intf = port
130         else:
131             intf = self.find_interface(name=port)
132         return self._port_num_map.setdefault(intf["name"],
133                                              int(intf["virtual-interface"]["dpdk_port_num"]))
134
135     def port_nums(self, intfs):
136         return [self.port_num(i) for i in intfs]
137
138     def ports_iter(self):
139         for port_name in self.port_pairs.all_ports:
140             port_num = self.port_num(port_name)
141             yield port_name, port_num
142
143
144 class TrafficGeneratorProducer(producer.MessagingProducer):
145     """Class implementing the message producer for traffic generators
146
147     This message producer must be instantiated in the process created
148     "run_traffic" process.
149     """
150     def __init__(self, _id):
151         super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
152                                                        _id=_id)
153
154     def tg_method_started(self, version=1):
155         """Send a message to inform the traffic generation has started"""
156         self.send_message(
157             messaging.TG_METHOD_STARTED,
158             payloads.TrafficGeneratorPayload(version=version, iteration=0,
159                                              kpi={}))
160
161     def tg_method_finished(self, version=1):
162         """Send a message to inform the traffic generation has finished"""
163         self.send_message(
164             messaging.TG_METHOD_FINISHED,
165             payloads.TrafficGeneratorPayload(version=version, iteration=0,
166                                              kpi={}))
167
168     def tg_method_iteration(self, iteration, version=1, kpi=None):
169         """Send a message, with KPI, once an iteration has finished"""
170         kpi = {} if kpi is None else kpi
171         self.send_message(
172             messaging.TG_METHOD_ITERATION,
173             payloads.TrafficGeneratorPayload(version=version,
174                                              iteration=iteration, kpi=kpi))
175
176
177 @six.add_metaclass(abc.ABCMeta)
178 class GenericVNF(object):
179     """Class providing file-like API for generic VNF implementation
180
181     Currently the only class implementing this interface is
182     yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
183     """
184
185     # centralize network naming convention
186     UPLINK = PortPairs.UPLINK
187     DOWNLINK = PortPairs.DOWNLINK
188
189     def __init__(self, name, vnfd):
190         self.name = name
191         self.vnfd_helper = VnfdHelper(vnfd)
192         # List of statistics we can obtain from this VNF
193         # - ETSI MANO 6.3.1.1 monitoring_parameter
194         self.kpi = self.vnfd_helper.kpi
195         # Standard dictionary containing params like thread no, buffer size etc
196         self.config = {}
197         self.runs_traffic = False
198
199     @abc.abstractmethod
200     def instantiate(self, scenario_cfg, context_cfg):
201         """Prepare VNF for operation and start the VNF process/VM
202
203         :param scenario_cfg: Scenario config
204         :param context_cfg: Context config
205         :return: True/False
206         """
207
208     @abc.abstractmethod
209     def wait_for_instantiate(self):
210         """Wait for VNF to start
211
212         :return: True/False
213         """
214
215     @abc.abstractmethod
216     def terminate(self):
217         """Kill all VNF processes"""
218
219     @abc.abstractmethod
220     def scale(self, flavor=""):
221         """rest
222
223         :param flavor: Name of the flavor.
224         :return:
225         """
226
227     @abc.abstractmethod
228     def collect_kpi(self):
229         """Return a dict containing the selected KPI at a given point of time
230
231         :return: {"kpi": value, "kpi2": value}
232         """
233
234     @abc.abstractmethod
235     def start_collect(self):
236         """Start KPI collection
237         :return: None
238         """
239
240     @abc.abstractmethod
241     def stop_collect(self):
242         """Stop KPI collection
243         :return: None
244         """
245
246
247 @six.add_metaclass(abc.ABCMeta)
248 class GenericTrafficGen(GenericVNF):
249     """ Class providing file-like API for generic traffic generator """
250
251     def __init__(self, name, vnfd):
252         super(GenericTrafficGen, self).__init__(name, vnfd)
253         self.runs_traffic = True
254         self.traffic_finished = False
255         self._mq_producer = None
256
257     @abc.abstractmethod
258     def run_traffic(self, traffic_profile):
259         """Generate traffic on the wire according to the given params.
260
261         This method is non-blocking, returns immediately when traffic process
262         is running. Mandatory.
263
264         :param traffic_profile:
265         :return: True/False
266         """
267
268     @abc.abstractmethod
269     def terminate(self):
270         """After this method finishes, all traffic processes should stop.
271
272         Mandatory.
273
274         :return: True/False
275         """
276
277     def listen_traffic(self, traffic_profile):
278         """Listen to traffic with the given parameters.
279
280         Method is non-blocking, returns immediately when traffic process
281         is running. Optional.
282
283         :param traffic_profile:
284         :return: True/False
285         """
286         pass
287
288     def verify_traffic(self, traffic_profile):
289         """Verify captured traffic after it has ended.
290
291         Optional.
292
293         :param traffic_profile:
294         :return: dict
295         """
296         pass
297
298     def wait_for_instantiate(self):
299         """Wait for an instance to load.
300
301         Optional.
302
303         :return: True/False
304         """
305         pass
306
307     def start_collect(self):
308         """Start KPI collection.
309
310         Traffic measurements are always collected during injection.
311
312         Optional.
313
314         :return: True/False
315         """
316         pass
317
318     def stop_collect(self):
319         """Stop KPI collection.
320
321         Optional.
322
323         :return: True/False
324         """
325         pass
326
327     @staticmethod
328     def _setup_mq_producer(id):
329         """Setup the TG MQ producer to send messages between processes
330
331         :return: (derived class from ``MessagingProducer``) MQ producer object
332         """
333         return TrafficGeneratorProducer(id)
334
335     def get_mq_producer_id(self):
336         """Return the MQ producer ID if initialized"""
337         if self._mq_producer:
338             return self._mq_producer.get_id()