self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.downlink_ports)
self.all_ports = self.vnfd_helper.port_nums(self.vnfd_helper.port_pairs.all_ports)
+ def port_num(self, intf):
+ # by default return port num
+ return self.vnfd_helper.port_num(intf)
+
def get_stats(self, *args, **kwargs):
try:
return self.client.get_stats(*args, **kwargs)
self._tg_process = Process(name=name, target=self._start_server)
self._tg_process.start()
- def wait_for_instantiate(self):
- # overridden by subclasses
- return self._wait_for_process()
-
def _check_status(self):
raise NotImplementedError
return self._traffic_process.is_alive()
- def listen_traffic(self, traffic_profile):
- """ Listen to traffic with the given parameters.
- Method is non-blocking, returns immediately when traffic process
- is running. Optional.
-
- :param traffic_profile:
- :return: True/False
- """
- pass
-
- def verify_traffic(self, traffic_profile):
- """ Verify captured traffic after it has ended. Optional.
-
- :param traffic_profile:
- :return: dict
- """
- pass
-
def collect_kpi(self):
# check if the tg processes have exited
for proc in (self._tg_process, self._traffic_process):