# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-""" Base class implementation for generic vnf implementation """
-from __future__ import absolute_import
+import abc
+
import logging
+import six
+from yardstick.common import messaging
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
+
LOG = logging.getLogger(__name__)
def __init__(self, *args, **kwargs):
super(VnfdHelper, self).__init__(*args, **kwargs)
self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
+ # port num is not present until binding so we have to memoize
+ self._port_num_map = {}
@property
def mgmt_interface(self):
virtual_intf = interface["virtual-interface"]
if virtual_intf[key] == value:
return interface
+ raise KeyError()
def find_interface(self, **kwargs):
key, value = next(iter(kwargs.items()))
for interface in self.interfaces:
if interface[key] == value:
return interface
+ raise KeyError()
# hide dpdk_port_num key so we can abstract
def find_interface_by_port(self, port):
# we have to convert to int to compare
if int(virtual_intf['dpdk_port_num']) == port:
return interface
+ raise KeyError()
- def port_num(self, name):
+ def port_num(self, port):
# we need interface name -> DPDK port num (PMD ID) -> LINK ID
# LINK ID -> PMD ID is governed by the port mask
"""
:rtype: int
- :type name: str
+ :type port: str
"""
- intf = self.find_interface(name=name)
- return int(intf["virtual-interface"]["dpdk_port_num"])
+ if isinstance(port, dict):
+ intf = port
+ else:
+ intf = self.find_interface(name=port)
+ return self._port_num_map.setdefault(intf["name"],
+ int(intf["virtual-interface"]["dpdk_port_num"]))
def port_nums(self, intfs):
return [self.port_num(i) for i in intfs]
+ def ports_iter(self):
+ for port_name in self.port_pairs.all_ports:
+ port_num = self.port_num(port_name)
+ yield port_name, port_num
+
+
+class TrafficGeneratorProducer(producer.MessagingProducer):
+ """Class implementing the message producer for traffic generators
+
+ This message producer must be instantiated in the process created
+ "run_traffic" process.
+ """
+ def __init__(self, _id):
+ super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
+ _id=_id)
+
+ def tg_method_started(self, version=1):
+ """Send a message to inform the traffic generation has started"""
+ self.send_message(
+ messaging.TG_METHOD_STARTED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_finished(self, version=1):
+ """Send a message to inform the traffic generation has finished"""
+ self.send_message(
+ messaging.TG_METHOD_FINISHED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_iteration(self, iteration, version=1, kpi=None):
+ """Send a message, with KPI, once an iteration has finished"""
+ kpi = {} if kpi is None else kpi
+ self.send_message(
+ messaging.TG_METHOD_ITERATION,
+ payloads.TrafficGeneratorPayload(version=version,
+ iteration=iteration, kpi=kpi))
+
+
+@six.add_metaclass(abc.ABCMeta)
+class GenericVNFEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``GenericVNFConsumer``"""
+
+ @abc.abstractmethod
+ def runner_method_start_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_START_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
+ @abc.abstractmethod
+ def runner_method_stop_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_STOP_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
-class VNFObject(object):
+class GenericVNFConsumer(consumer.MessagingConsumer):
+ """MQ consumer for ``GenericVNF`` derived classes"""
+
+ def __init__(self, ctx_ids, endpoints):
+ if not isinstance(endpoints, list):
+ endpoints = [endpoints]
+ super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER,
+ ctx_ids, endpoints)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class GenericVNF(object):
+ """Class providing file-like API for generic VNF implementation
+
+ Currently the only class implementing this interface is
+ yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
+ """
# centralize network naming convention
UPLINK = PortPairs.UPLINK
DOWNLINK = PortPairs.DOWNLINK
- def __init__(self, name, vnfd):
- super(VNFObject, self).__init__()
+ def __init__(self, name, vnfd, task_id):
self.name = name
- self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
-
-
-class GenericVNF(VNFObject):
-
- """ Class providing file-like API for generic VNF implementation """
- def __init__(self, name, vnfd):
- super(GenericVNF, self).__init__(name, vnfd)
+ self._task_id = task_id
+ self.vnfd_helper = VnfdHelper(vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition()
+ self.kpi = self.vnfd_helper.kpi
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- def _get_kpi_definition(self):
- """ Get list of KPIs defined in VNFD
+ @abc.abstractmethod
+ def instantiate(self, scenario_cfg, context_cfg):
+ """Prepare VNF for operation and start the VNF process/VM
- :param vnfd:
- :return: list of KPIs, e.g. ['throughput', 'latency']
+ :param scenario_cfg: Scenario config
+ :param context_cfg: Context config
+ :return: True/False
"""
- return self.vnfd_helper.kpi
- def instantiate(self, scenario_cfg, context_cfg):
- """ Prepare VNF for operation and start the VNF process/VM
+ @abc.abstractmethod
+ def wait_for_instantiate(self):
+ """Wait for VNF to start
- :param scenario_cfg:
- :param context_cfg:
:return: True/False
"""
- raise NotImplementedError()
+ @abc.abstractmethod
def terminate(self):
- """ Kill all VNF processes
-
- :return:
- """
- raise NotImplementedError()
+ """Kill all VNF processes"""
+ @abc.abstractmethod
def scale(self, flavor=""):
- """
+ """rest
- :param flavor:
+ :param flavor: Name of the flavor.
:return:
"""
- raise NotImplementedError()
+ @abc.abstractmethod
def collect_kpi(self):
- """This method should return a dictionary containing the
- selected KPI at a given point of time.
+ """Return a dict containing the selected KPI at a given point of time
:return: {"kpi": value, "kpi2": value}
"""
- raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start_collect(self):
+ """Start KPI collection
+ :return: None
+ """
+
+ @abc.abstractmethod
+ def stop_collect(self):
+ """Stop KPI collection
+ :return: None
+ """
+@six.add_metaclass(abc.ABCMeta)
class GenericTrafficGen(GenericVNF):
- """ Class providing file-like API for generic traffic generator """
+ """Class providing file-like API for generic traffic generator"""
- def __init__(self, name, vnfd):
- super(GenericTrafficGen, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id):
+ super(GenericTrafficGen, self).__init__(name, vnfd, task_id)
self.runs_traffic = True
self.traffic_finished = False
+ self._mq_producer = None
+ @abc.abstractmethod
def run_traffic(self, traffic_profile):
- """ Generate traffic on the wire according to the given params.
- Method is non-blocking, returns immediately when traffic process
+ """Generate traffic on the wire according to the given params.
+
+ This method is non-blocking, returns immediately when traffic process
is running. Mandatory.
:param traffic_profile:
:return: True/False
"""
- raise NotImplementedError()
+
+ @abc.abstractmethod
+ def terminate(self):
+ """After this method finishes, all traffic processes should stop.
+
+ Mandatory.
+
+ :return: True/False
+ """
def listen_traffic(self, traffic_profile):
- """ Listen to traffic with the given parameters.
+ """Listen to traffic with the given parameters.
+
Method is non-blocking, returns immediately when traffic process
is running. Optional.
pass
def verify_traffic(self, traffic_profile):
- """ Verify captured traffic after it has ended. Optional.
+ """Verify captured traffic after it has ended.
+
+ Optional.
:param traffic_profile:
:return: dict
"""
pass
- def terminate(self):
- """ After this method finishes, all traffic processes should stop. Mandatory.
+ def wait_for_instantiate(self):
+ """Wait for an instance to load.
+
+ Optional.
:return: True/False
"""
- raise NotImplementedError()
+ pass
+
+ def start_collect(self):
+ """Start KPI collection.
+
+ Traffic measurements are always collected during injection.
+
+ Optional.
+
+ :return: True/False
+ """
+ pass
+
+ def stop_collect(self):
+ """Stop KPI collection.
+
+ Optional.
+
+ :return: True/False
+ """
+ pass
+
+ @staticmethod
+ def _setup_mq_producer(id):
+ """Setup the TG MQ producer to send messages between processes
+
+ :return: (derived class from ``MessagingProducer``) MQ producer object
+ """
+ return TrafficGeneratorProducer(id)
+
+ def get_mq_producer_id(self):
+ """Return the MQ producer ID if initialized"""
+ if self._mq_producer:
+ return self._mq_producer.id