# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-""" Base class implementation for generic vnf implementation """
import abc
import logging
import six
+from yardstick.common import messaging
+from yardstick.common.messaging import consumer
+from yardstick.common.messaging import payloads
+from yardstick.common.messaging import producer
from yardstick.network_services.helpers.samplevnf_helper import PortPairs
yield port_name, port_num
-class VNFObject(object):
+class TrafficGeneratorProducer(producer.MessagingProducer):
+ """Class implementing the message producer for traffic generators
+
+ This message producer must be instantiated in the process created
+ "run_traffic" process.
+ """
+ def __init__(self, _id):
+ super(TrafficGeneratorProducer, self).__init__(messaging.TOPIC_TG,
+ _id=_id)
+
+ def tg_method_started(self, version=1):
+ """Send a message to inform the traffic generation has started"""
+ self.send_message(
+ messaging.TG_METHOD_STARTED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_finished(self, version=1):
+ """Send a message to inform the traffic generation has finished"""
+ self.send_message(
+ messaging.TG_METHOD_FINISHED,
+ payloads.TrafficGeneratorPayload(version=version, iteration=0,
+ kpi={}))
+
+ def tg_method_iteration(self, iteration, version=1, kpi=None):
+ """Send a message, with KPI, once an iteration has finished"""
+ kpi = {} if kpi is None else kpi
+ self.send_message(
+ messaging.TG_METHOD_ITERATION,
+ payloads.TrafficGeneratorPayload(version=version,
+ iteration=iteration, kpi=kpi))
+
+
+@six.add_metaclass(abc.ABCMeta)
+class GenericVNFEndpoint(consumer.NotificationHandler):
+ """Endpoint class for ``GenericVNFConsumer``"""
+
+ @abc.abstractmethod
+ def runner_method_start_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_START_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
+ @abc.abstractmethod
+ def runner_method_stop_iteration(self, ctxt, **kwargs):
+ """Endpoint when RUNNER_METHOD_STOP_ITERATION is received
+
+ :param ctxt: (dict) {'id': <Producer ID>}
+ :param kwargs: (dict) ``payloads.RunnerPayload`` context
+ """
+
+
+class GenericVNFConsumer(consumer.MessagingConsumer):
+ """MQ consumer for ``GenericVNF`` derived classes"""
+
+ def __init__(self, ctx_ids, endpoints):
+ if not isinstance(endpoints, list):
+ endpoints = [endpoints]
+ super(GenericVNFConsumer, self).__init__(messaging.TOPIC_RUNNER,
+ ctx_ids, endpoints)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class GenericVNF(object):
+ """Class providing file-like API for generic VNF implementation
+
+ Currently the only class implementing this interface is
+ yardstick/network_services/vnf_generic/vnf/sample_vnf:SampleVNF.
+ """
# centralize network naming convention
UPLINK = PortPairs.UPLINK
DOWNLINK = PortPairs.DOWNLINK
- def __init__(self, name, vnfd):
- super(VNFObject, self).__init__()
+ def __init__(self, name, vnfd, task_id):
self.name = name
- self.vnfd_helper = VnfdHelper(vnfd) # fixme: parse this into a structure
-
-
-class GenericVNF(VNFObject):
-
- """ Class providing file-like API for generic VNF implementation """
- def __init__(self, name, vnfd):
- super(GenericVNF, self).__init__(name, vnfd)
+ self._task_id = task_id
+ self.vnfd_helper = VnfdHelper(vnfd)
# List of statistics we can obtain from this VNF
# - ETSI MANO 6.3.1.1 monitoring_parameter
- self.kpi = self._get_kpi_definition()
+ self.kpi = self.vnfd_helper.kpi
# Standard dictionary containing params like thread no, buffer size etc
self.config = {}
self.runs_traffic = False
- def _get_kpi_definition(self):
- """ Get list of KPIs defined in VNFD
-
- :param vnfd:
- :return: list of KPIs, e.g. ['throughput', 'latency']
- """
- return self.vnfd_helper.kpi
-
+ @abc.abstractmethod
def instantiate(self, scenario_cfg, context_cfg):
- """ Prepare VNF for operation and start the VNF process/VM
+ """Prepare VNF for operation and start the VNF process/VM
- :param scenario_cfg:
- :param context_cfg:
+ :param scenario_cfg: Scenario config
+ :param context_cfg: Context config
:return: True/False
"""
- raise NotImplementedError()
+ @abc.abstractmethod
def wait_for_instantiate(self):
- """ Wait for VNF to start
+ """Wait for VNF to start
:return: True/False
"""
- raise NotImplementedError()
+ @abc.abstractmethod
def terminate(self):
- """ Kill all VNF processes
-
- :return:
- """
- raise NotImplementedError()
+ """Kill all VNF processes"""
+ @abc.abstractmethod
def scale(self, flavor=""):
- """
+ """rest
- :param flavor:
+ :param flavor: Name of the flavor.
:return:
"""
- raise NotImplementedError()
+ @abc.abstractmethod
def collect_kpi(self):
- """This method should return a dictionary containing the
- selected KPI at a given point of time.
+ """Return a dict containing the selected KPI at a given point of time
:return: {"kpi": value, "kpi2": value}
"""
- raise NotImplementedError()
+
+ @abc.abstractmethod
+ def start_collect(self):
+ """Start KPI collection
+ :return: None
+ """
+
+ @abc.abstractmethod
+ def stop_collect(self):
+ """Stop KPI collection
+ :return: None
+ """
@six.add_metaclass(abc.ABCMeta)
class GenericTrafficGen(GenericVNF):
- """ Class providing file-like API for generic traffic generator """
+ """Class providing file-like API for generic traffic generator"""
- def __init__(self, name, vnfd):
- super(GenericTrafficGen, self).__init__(name, vnfd)
+ def __init__(self, name, vnfd, task_id):
+ super(GenericTrafficGen, self).__init__(name, vnfd, task_id)
self.runs_traffic = True
self.traffic_finished = False
+ self._mq_producer = None
@abc.abstractmethod
def run_traffic(self, traffic_profile):
:return: True/False
"""
pass
+
+ def start_collect(self):
+ """Start KPI collection.
+
+ Traffic measurements are always collected during injection.
+
+ Optional.
+
+ :return: True/False
+ """
+ pass
+
+ def stop_collect(self):
+ """Stop KPI collection.
+
+ Optional.
+
+ :return: True/False
+ """
+ pass
+
+ @staticmethod
+ def _setup_mq_producer(id):
+ """Setup the TG MQ producer to send messages between processes
+
+ :return: (derived class from ``MessagingProducer``) MQ producer object
+ """
+ return TrafficGeneratorProducer(id)
+
+ def get_mq_producer_id(self):
+ """Return the MQ producer ID if initialized"""
+ if self._mq_producer:
+ return self._mq_producer.id