804257fe903680df6be56c0f4982fb79794a97a2
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / base.py
1 # Copyright (c) 2016-2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 """ Base class implementation for generic vnf implementation """
15
16 import abc
17
18 import logging
19 import six
20
21 from yardstick.common import messaging
22 from yardstick.common.messaging import producer
23 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
24
25
26 LOG = logging.getLogger(__name__)
27
28
29 class QueueFileWrapper(object):
30     """ Class providing file-like API for talking with SSH connection """
31
32     def __init__(self, q_in, q_out, prompt):
33         self.q_in = q_in
34         self.q_out = q_out
35         self.closed = False
36         self.buf = []
37         self.bufsize = 20
38         self.prompt = prompt
39
40     def read(self, size):
41         """ read chunk from input queue """
42         if self.q_in.qsize() > 0 and size:
43             in_data = self.q_in.get()
44             return in_data
45
46     def write(self, chunk):
47         """ write chunk to output queue """
48         self.buf.append(chunk)
49         # flush on prompt or if we exceed bufsize
50
51         size = sum(len(c) for c in self.buf)
52         if self.prompt in chunk or size > self.bufsize:
53             out = ''.join(self.buf)
54             self.buf = []
55             self.q_out.put(out)
56
57     def close(self):
58         """ close multiprocessing queue """
59         pass
60
61     def clear(self):
62         """ clear queue """
63         while self.q_out.qsize() > 0:
64             self.q_out.get()
65
66
67 class VnfdHelper(dict):
68
69     def __init__(self, *args, **kwargs):
70         super(VnfdHelper, self).__init__(*args, **kwargs)
71         self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
72         # port num is not present until binding so we have to memoize
73         self._port_num_map = {}
74
75     @property
76     def mgmt_interface(self):
77         return self["mgmt-interface"]
78
79     @property
80     def vdu(self):
81         return self['vdu']
82
83     @property
84     def vdu0(self):
85         return self.vdu[0]
86
87     @property
88     def interfaces(self):
89         return self.vdu0['external-interface']
90
91     @property
92     def kpi(self):
93         return self['benchmark']['kpi']
94
95     def find_virtual_interface(self, **kwargs):
96         key, value = next(iter(kwargs.items()))
97         for interface in self.interfaces:
98             virtual_intf = interface["virtual-interface"]
99             if virtual_intf[key] == value:
100                 return interface
101         raise KeyError()
102
103     def find_interface(self, **kwargs):
104         key, value = next(iter(kwargs.items()))
105         for interface in self.interfaces:
106             if interface[key] == value:
107                 return interface
108         raise KeyError()
109
110     # hide dpdk_port_num key so we can abstract
111     def find_interface_by_port(self, port):
112         for interface in self.interfaces:
113             virtual_intf = interface["virtual-interface"]
114             # we have to convert to int to compare
115             if int(virtual_intf['dpdk_port_num']) == port:
116                 return interface
117         raise KeyError()
118
119     def port_num(self, port):
120         # we need interface name -> DPDK port num (PMD ID) -> LINK ID
121         # LINK ID -> PMD ID is governed by the port mask
122         """
123
124         :rtype: int
125         :type port: str
126         """
127         if isinstance(port, dict):
128             intf = port
129         else:
130             intf = self.find_interface(name=port)
131         return self._port_num_map.setdefault(intf["name"],
132                                              int(intf["virtual-interface"]["dpdk_port_num"]))
133
134     def port_nums(self, intfs):
135         return [self.port_num(i) for i in intfs]
136
137     def ports_iter(self):
138         for port_name in self.port_pairs.all_ports:
139             port_num = self.port_num(port_name)
140             yield port_name, port_num
141
142
143 class TrafficGeneratorProducer(producer.MessagingProducer):
144     """Class implementing the message producer for traffic generators
145
146     This message producer must be instantiated in the process created
147     "run_traffic" process.
148     """
149     def __init__(self, pid):
150         super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
151                                                        pid=pid)
152
153
154 @six.add_metaclass(abc.ABCMeta)
155 class GenericVNF(object):
156     """Class providing file-like API for generic VNF implementation
157
158     Currently the only class implementing this interface is
159     yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
160     """
161
162     # centralize network naming convention
163     UPLINK = PortPairs.UPLINK
164     DOWNLINK = PortPairs.DOWNLINK
165
166     def __init__(self, name, vnfd):
167         self.name = name
168         self.vnfd_helper = VnfdHelper(vnfd)
169         # List of statistics we can obtain from this VNF
170         # - ETSI MANO 6.3.1.1 monitoring_parameter
171         self.kpi = self.vnfd_helper.kpi
172         # Standard dictionary containing params like thread no, buffer size etc
173         self.config = {}
174         self.runs_traffic = False
175
176     @abc.abstractmethod
177     def instantiate(self, scenario_cfg, context_cfg):
178         """Prepare VNF for operation and start the VNF process/VM
179
180         :param scenario_cfg: Scenario config
181         :param context_cfg: Context config
182         :return: True/False
183         """
184
185     @abc.abstractmethod
186     def wait_for_instantiate(self):
187         """Wait for VNF to start
188
189         :return: True/False
190         """
191
192     @abc.abstractmethod
193     def terminate(self):
194         """Kill all VNF processes"""
195
196     @abc.abstractmethod
197     def scale(self, flavor=""):
198         """rest
199
200         :param flavor: Name of the flavor.
201         :return:
202         """
203
204     @abc.abstractmethod
205     def collect_kpi(self):
206         """Return a dict containing the selected KPI at a given point of time
207
208         :return: {"kpi": value, "kpi2": value}
209         """
210
211     @abc.abstractmethod
212     def start_collect(self):
213         """Start KPI collection
214         :return: None
215         """
216
217     @abc.abstractmethod
218     def stop_collect(self):
219         """Stop KPI collection
220         :return: None
221         """
222
223
224 @six.add_metaclass(abc.ABCMeta)
225 class GenericTrafficGen(GenericVNF):
226     """ Class providing file-like API for generic traffic generator """
227
228     def __init__(self, name, vnfd):
229         super(GenericTrafficGen, self).__init__(name, vnfd)
230         self.runs_traffic = True
231         self.traffic_finished = False
232         self._mq_producer = None
233
234     @abc.abstractmethod
235     def run_traffic(self, traffic_profile):
236         """Generate traffic on the wire according to the given params.
237
238         This method is non-blocking, returns immediately when traffic process
239         is running. Mandatory.
240
241         :param traffic_profile:
242         :return: True/False
243         """
244
245     @abc.abstractmethod
246     def terminate(self):
247         """After this method finishes, all traffic processes should stop.
248
249         Mandatory.
250
251         :return: True/False
252         """
253
254     def listen_traffic(self, traffic_profile):
255         """Listen to traffic with the given parameters.
256
257         Method is non-blocking, returns immediately when traffic process
258         is running. Optional.
259
260         :param traffic_profile:
261         :return: True/False
262         """
263         pass
264
265     def verify_traffic(self, traffic_profile):
266         """Verify captured traffic after it has ended.
267
268         Optional.
269
270         :param traffic_profile:
271         :return: dict
272         """
273         pass
274
275     def wait_for_instantiate(self):
276         """Wait for an instance to load.
277
278         Optional.
279
280         :return: True/False
281         """
282         pass
283
284     def start_collect(self):
285         """Start KPI collection.
286
287         Traffic measurements are always collected during injection.
288
289         Optional.
290
291         :return: True/False
292         """
293         pass
294
295     def stop_collect(self):
296         """Stop KPI collection.
297
298         Optional.
299
300         :return: True/False
301         """
302         pass
303
304     def _setup_mq_producer(self, pid):
305         """Setup the TG MQ producer to send messages between processes
306
307         :return: (derived class from ``MessagingProducer``) MQ producer object
308         """
309         return TrafficGeneratorProducer(pid)