[NFVBENCH-67] NFVbench should have same TX rate for different rate formats
[nfvbench.git] / nfvbench / traffic_client.py
old mode 100644 (file)
new mode 100755 (executable)
index 8bfcd76..bdcc027
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from datetime import datetime
+import re
+import socket
+import struct
+import time
+
 from attrdict import AttrDict
 import bitmath
-from datetime import datetime
-from log import LOG
 from netaddr import IPNetwork
+# pylint: disable=import-error
+from trex_stl_lib.api import STLError
+# pylint: enable=import-error
+
+from log import LOG
 from network import Interface
-import socket
 from specs import ChainType
 from stats_collector import IntervalCollector
 from stats_collector import IterationCollector
-import struct
-import time
 import traffic_gen.traffic_utils as utils
+from utils import cast_integer
 
 
 class TrafficClientException(Exception):
@@ -32,7 +39,6 @@ class TrafficClientException(Exception):
 
 
 class TrafficRunner(object):
-
     def __init__(self, client, duration_sec, interval_sec=0):
         self.client = client
         self.start_time = None
@@ -57,12 +63,14 @@ class TrafficRunner(object):
     def time_elapsed(self):
         if self.is_running():
             return time.time() - self.start_time
-        else:
-            return self.duration_sec
+        return self.duration_sec
 
     def poll_stats(self):
         if not self.is_running():
             return None
+        if self.client.skip_sleep:
+            self.stop()
+            return self.client.get_stats()
         time_elapsed = self.time_elapsed()
         if time_elapsed > self.duration_sec:
             self.stop()
@@ -80,11 +88,42 @@ class TrafficRunner(object):
         return self.client.get_stats()
 
 
-class Device(object):
+class IpBlock(object):
+    def __init__(self, base_ip, step_ip, count_ip):
+        self.base_ip_int = Device.ip_to_int(base_ip)
+        self.step = Device.ip_to_int(step_ip)
+        self.max_available = count_ip
+        self.next_free = 0
+
+    def get_ip(self, index=0):
+        '''Return the IP address at given index
+        '''
+        if index < 0 or index >= self.max_available:
+            raise IndexError('Index out of bounds')
+        return Device.int_to_ip(self.base_ip_int + index * self.step)
 
+    def reserve_ip_range(self, count):
+        '''Reserve a range of count consecutive IP addresses spaced by step
+        '''
+        if self.next_free + count > self.max_available:
+            raise IndexError('No more IP addresses next free=%d max_available=%d requested=%d' %
+                             (self.next_free,
+                              self.max_available,
+                              count))
+        first_ip = self.get_ip(self.next_free)
+        last_ip = self.get_ip(self.next_free + count - 1)
+        self.next_free += count
+        return (first_ip, last_ip)
+
+    def reset_reservation(self):
+        self.next_free = 0
+
+
+class Device(object):
     def __init__(self, port, pci, switch_port=None, vtep_vlan=None, ip=None, tg_gateway_ip=None,
                  gateway_ip=None, ip_addrs_step=None, tg_gateway_ip_addrs_step=None,
-                 gateway_ip_addrs_step=None, chain_count=1, flow_count=1, vlan_tagging=False):
+                 gateway_ip_addrs_step=None, udp_src_port=None, udp_dst_port=None,
+                 dst_mac=None, chain_count=1, flow_count=1, vlan_tagging=False):
         self.chain_count = chain_count
         self.flow_count = flow_count
         self.dst = None
@@ -95,6 +134,7 @@ class Device(object):
         self.vlan_tagging = vlan_tagging
         self.pci = pci
         self.mac = None
+        self.dst_mac = dst_mac
         self.vm_mac_list = None
         subnet = IPNetwork(ip)
         self.ip = subnet.ip.format()
@@ -102,15 +142,17 @@ class Device(object):
         self.ip_addrs_step = ip_addrs_step
         self.tg_gateway_ip_addrs_step = tg_gateway_ip_addrs_step
         self.gateway_ip_addrs_step = gateway_ip_addrs_step
-        self.ip_list = self.expand_ip(self.ip, self.ip_addrs_step, self.flow_count)
         self.gateway_ip = gateway_ip
-        self.gateway_ip_list = self.expand_ip(self.gateway_ip,
-                                              self.gateway_ip_addrs_step,
-                                              self.chain_count)
         self.tg_gateway_ip = tg_gateway_ip
-        self.tg_gateway_ip_list = self.expand_ip(self.tg_gateway_ip,
-                                                 self.tg_gateway_ip_addrs_step,
-                                                 self.chain_count)
+        self.ip_block = IpBlock(self.ip, ip_addrs_step, flow_count)
+        self.gw_ip_block = IpBlock(gateway_ip,
+                                   gateway_ip_addrs_step,
+                                   chain_count)
+        self.tg_gw_ip_block = IpBlock(tg_gateway_ip,
+                                      tg_gateway_ip_addrs_step,
+                                      chain_count)
+        self.udp_src_port = udp_src_port
+        self.udp_dst_port = udp_dst_port
 
     def set_mac(self, mac):
         if mac is None:
@@ -128,53 +170,65 @@ class Device(object):
             raise TrafficClientException('Trying to set VLAN tag as None')
         self.vlan_tag = vlan_tag
 
+    def get_gw_ip(self, chain_index):
+        '''Retrieve the IP address assigned for the gateway of a given chain
+        '''
+        return self.gw_ip_block.get_ip(chain_index)
+
     def get_stream_configs(self, service_chain):
         configs = []
-        flow_idx = 0
+        # exact flow count for each chain is calculated as follows:
+        # - all chains except the first will have the same flow count
+        #   calculated as (total_flows + chain_count - 1) / chain_count
+        # - the first chain will have the remainder
+        # example 11 flows and 3 chains => 3, 4, 4
+        flows_per_chain = (self.flow_count + self.chain_count - 1) / self.chain_count
+        cur_chain_flow_count = self.flow_count - flows_per_chain * (self.chain_count - 1)
+
+        self.ip_block.reset_reservation()
+        self.dst.ip_block.reset_reservation()
+
         for chain_idx in xrange(self.chain_count):
-            current_flow_count = (self.flow_count - flow_idx) / (self.chain_count - chain_idx)
-            max_idx = flow_idx + current_flow_count - 1
-            ip_src_count = self.ip_to_int(self.ip_list[max_idx]) - \
-                self.ip_to_int(self.ip_list[flow_idx]) + 1
-            ip_dst_count = self.ip_to_int(self.dst.ip_list[max_idx]) - \
-                self.ip_to_int(self.dst.ip_list[flow_idx]) + 1
+            src_ip_first, src_ip_last = self.ip_block.reserve_ip_range(cur_chain_flow_count)
+            dst_ip_first, dst_ip_last = self.dst.ip_block.reserve_ip_range(cur_chain_flow_count)
+
+            dst_mac = self.dst_mac[chain_idx] if self.dst_mac is not None else self.dst.mac
+            if not re.match("[0-9a-f]{2}([-:])[0-9a-f]{2}(\\1[0-9a-f]{2}){4}$", dst_mac.lower()):
+                raise TrafficClientException("Invalid MAC address '{mac}' specified in "
+                                             "mac_addrs_left/right".format(mac=dst_mac))
 
             configs.append({
-                'count': current_flow_count,
+                'count': cur_chain_flow_count,
                 'mac_src': self.mac,
-                'mac_dst': self.dst.mac if service_chain == ChainType.EXT
-                else self.vm_mac_list[chain_idx],
-                'ip_src_addr': self.ip_list[flow_idx],
-                'ip_src_addr_max': self.ip_list[max_idx],
-                'ip_src_count': ip_src_count,
-                'ip_dst_addr': self.dst.ip_list[flow_idx],
-                'ip_dst_addr_max': self.dst.ip_list[max_idx],
-                'ip_dst_count': ip_dst_count,
+                'mac_dst': dst_mac if service_chain == ChainType.EXT else self.vm_mac_list[
+                    chain_idx],
+                'ip_src_addr': src_ip_first,
+                'ip_src_addr_max': src_ip_last,
+                'ip_src_count': cur_chain_flow_count,
+                'ip_dst_addr': dst_ip_first,
+                'ip_dst_addr_max': dst_ip_last,
+                'ip_dst_count': cur_chain_flow_count,
                 'ip_addrs_step': self.ip_addrs_step,
-                'mac_discovery_gw': self.gateway_ip_list[chain_idx],
-                'ip_src_tg_gw': self.tg_gateway_ip_list[chain_idx],
-                'ip_dst_tg_gw': self.dst.tg_gateway_ip_list[chain_idx],
+                'udp_src_port': self.udp_src_port,
+                'udp_dst_port': self.udp_dst_port,
+                'mac_discovery_gw': self.get_gw_ip(chain_idx),
+                'ip_src_tg_gw': self.tg_gw_ip_block.get_ip(chain_idx),
+                'ip_dst_tg_gw': self.dst.tg_gw_ip_block.get_ip(chain_idx),
                 'vlan_tag': self.vlan_tag if self.vlan_tagging else None
             })
+            # after first chain, fall back to the flow count for all other chains
+            cur_chain_flow_count = flows_per_chain
 
-            flow_idx += current_flow_count
         return configs
 
-    @classmethod
-    def expand_ip(cls, ip, step_ip, count):
-        if step_ip == 'random':
-            # Repeatable Random will used in the stream src/dst IP pairs, but we still need
-            # to expand the IP based on the number of chains and flows configured. So we use
-            # "0.0.0.1" as the step to have the exact IP flow ranges for every chain.
-            step_ip = '0.0.0.1'
-
-        step_ip_in_int = cls.ip_to_int(step_ip)
-        subnet = IPNetwork(ip)
-        ip_list = []
-        for _ in xrange(count):
-            ip_list.append(subnet.ip.format())
-            subnet = subnet.next(step_ip_in_int)
-        return ip_list
+    def ip_range_overlaps(self):
+        '''Check if this device ip range is overlapping with the dst device ip range
+        '''
+        src_base_ip = Device.ip_to_int(self.ip)
+        dst_base_ip = Device.ip_to_int(self.dst.ip)
+        src_last_ip = src_base_ip + self.flow_count - 1
+        dst_last_ip = dst_base_ip + self.flow_count - 1
+        return dst_last_ip >= src_base_ip and src_last_ip >= dst_base_ip
 
     @staticmethod
     def mac_to_int(mac):
@@ -190,6 +244,10 @@ class Device(object):
     def ip_to_int(addr):
         return struct.unpack("!I", socket.inet_aton(addr))[0]
 
+    @staticmethod
+    def int_to_ip(nvalue):
+        return socket.inet_ntoa(struct.pack("!I", nvalue))
+
 
 class RunningTrafficProfile(object):
     """Represents traffic configuration for currently running traffic profile."""
@@ -220,6 +278,8 @@ class RunningTrafficProfile(object):
         self.src_device = None
         self.dst_device = None
         self.vm_mac_list = None
+        self.mac_addrs_left = generator_config.mac_addrs_left
+        self.mac_addrs_right = generator_config.mac_addrs_right
         self.__prep_interfaces(generator_config)
 
     def to_json(self):
@@ -234,8 +294,8 @@ class RunningTrafficProfile(object):
         generator_config = AttrDict(traffic_generator)
         generator_config.pop('default_profile')
         generator_config.pop('generator_profile')
-        matching_profile = filter(lambda profile: profile.name == generator_profile,
-                                  traffic_generator.generator_profile)
+        matching_profile = [profile for profile in traffic_generator.generator_profile if
+                            profile.name == generator_profile]
         if len(matching_profile) != 1:
             raise Exception('Traffic generator profile not found: ' + generator_profile)
 
@@ -253,7 +313,10 @@ class RunningTrafficProfile(object):
             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[0],
             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
-            'vlan_tagging': self.vlan_tagging
+            'udp_src_port': generator_config.udp_src_port,
+            'udp_dst_port': generator_config.udp_dst_port,
+            'vlan_tagging': self.vlan_tagging,
+            'dst_mac': generator_config.mac_addrs_left
         }
         dst_config = {
             'chain_count': self.service_chain_count,
@@ -264,7 +327,10 @@ class RunningTrafficProfile(object):
             'gateway_ip_addrs_step': self.gateway_ip_addrs_step,
             'tg_gateway_ip': generator_config.tg_gateway_ip_addrs[1],
             'tg_gateway_ip_addrs_step': self.tg_gateway_ip_addrs_step,
-            'vlan_tagging': self.vlan_tagging
+            'udp_src_port': generator_config.udp_src_port,
+            'udp_dst_port': generator_config.udp_dst_port,
+            'vlan_tagging': self.vlan_tagging,
+            'dst_mac': generator_config.mac_addrs_right
         }
 
         self.src_device = Device(**dict(src_config, **generator_config.interfaces[0]))
@@ -273,14 +339,11 @@ class RunningTrafficProfile(object):
         self.dst_device.set_destination(self.src_device)
 
         if self.service_chain == ChainType.EXT and not self.no_arp \
-                and not self.__are_unique(self.src_device.ip_list, self.dst_device.ip_list):
-            raise Exception('Computed IP addresses are not unique, choose different base. '
-                            'Start IPs: {start}. End IPs: {end}'
-                            .format(start=self.src_device.ip_list,
-                                    end=self.dst_device.ip_list))
-
-    def __are_unique(self, list1, list2):
-        return set(list1).isdisjoint(set(list2))
+                and self.src_device.ip_range_overlaps():
+            raise Exception('Overlapping IP address ranges src=%s dst=%d flows=%d' %
+                            self.src_device.ip,
+                            self.dst_device.ip,
+                            self.flow_count)
 
     @property
     def devices(self):
@@ -304,7 +367,6 @@ class RunningTrafficProfile(object):
 
 
 class TrafficGeneratorFactory(object):
-
     def __init__(self, config):
         self.config = config
 
@@ -319,8 +381,7 @@ class TrafficGeneratorFactory(object):
         elif tool == 'dummy':
             from traffic_gen import dummy
             return dummy.DummyTG(self.config)
-        else:
-            return None
+        return None
 
     def list_generator_profile(self):
         return [profile.name for profile in self.config.traffic_generator.generator_profile]
@@ -329,12 +390,12 @@ class TrafficGeneratorFactory(object):
         return RunningTrafficProfile(self.config, generator_profile)
 
     def get_matching_profile(self, traffic_profile_name):
-        matching_profile = filter(lambda profile: profile.name == traffic_profile_name,
-                                  self.config.traffic_profile)
+        matching_profile = [profile for profile in self.config.traffic_profile if
+                            profile.name == traffic_profile_name]
 
         if len(matching_profile) > 1:
             raise Exception('Multiple traffic profiles with the same name found.')
-        elif len(matching_profile) == 0:
+        elif not matching_profile:
             raise Exception('No traffic profile found.')
 
         return matching_profile[0]
@@ -345,10 +406,9 @@ class TrafficGeneratorFactory(object):
 
 
 class TrafficClient(object):
-
     PORTS = [0, 1]
 
-    def __init__(self, config, notifier=None):
+    def __init__(self, config, notifier=None, skip_sleep=False):
         generator_factory = TrafficGeneratorFactory(config)
         self.gen = generator_factory.get_generator_client()
         self.tool = generator_factory.get_tool()
@@ -364,11 +424,13 @@ class TrafficClient(object):
             'l2frame_size': None,
             'duration_sec': self.config.duration_sec,
             'bidirectional': True,
-            'rates': None
+            'rates': []  # to avoid unsbuscriptable-obj warning
         }
         self.current_total_rate = {'rate_percent': '10'}
         if self.config.single_run:
             self.current_total_rate = utils.parse_rate_str(self.config.rate)
+        # UT with dummy TG can bypass all sleeps
+        self.skip_sleep = skip_sleep
 
     def set_macs(self):
         for mac, device in zip(self.gen.get_macs(), self.config.generator_config.devices):
@@ -410,13 +472,14 @@ class TrafficClient(object):
 
         # ensures enough traffic is coming back
         threshold = (self.config.service_chain_count - 1) / float(self.config.service_chain_count)
-
-        for it in xrange(self.config.generic_retry_count):
+        retry_count = (self.config.check_traffic_time_sec +
+                       self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
+        for it in xrange(retry_count):
             self.gen.clear_stats()
             self.gen.start_traffic()
-            LOG.info('Waiting for packets to be received back... ({} / {})'.format(it + 1,
-                     self.config.generic_retry_count))
-            time.sleep(self.config.generic_poll_sec)
+            LOG.info('Waiting for packets to be received back... (%d / %d)', it + 1, retry_count)
+            if not self.skip_sleep:
+                time.sleep(self.config.generic_poll_sec)
             self.gen.stop_traffic()
             stats = self.gen.get_stats()
 
@@ -436,7 +499,8 @@ class TrafficClient(object):
                 LOG.info('End-to-end connectivity ensured')
                 return
 
-            time.sleep(self.config.generic_poll_sec)
+            if not self.skip_sleep:
+                time.sleep(self.config.generic_poll_sec)
 
         raise TrafficClientException('End-to-end connectivity cannot be ensured')
 
@@ -454,6 +518,10 @@ class TrafficClient(object):
             unidir_reverse_pps = int(self.config.unidir_reverse_traffic_pps)
             if unidir_reverse_pps > 0:
                 self.run_config['rates'].append({'rate_pps': str(unidir_reverse_pps)})
+        # Fix for [NFVBENCH-67], convert the rate string to PPS
+        for idx, rate in enumerate(self.run_config['rates']):
+            if 'rate_pps' not in rate:
+                self.run_config['rates'][idx] = {'rate_pps': self.__convert_rates(rate)['rate_pps']}
 
         self.gen.clear_streamblock()
         self.gen.create_traffic(frame_size, self.run_config['rates'], bidirectional, latency=True)
@@ -508,8 +576,7 @@ class TrafficClient(object):
         total_pkts = result['tx']['total_pkts']
         if not total_pkts:
             return float('inf')
-        else:
-            return float(dropped_pkts) / total_pkts * 100
+        return float(dropped_pkts) / total_pkts * 100
 
     def get_stats(self):
         stats = self.gen.get_stats()
@@ -528,9 +595,12 @@ class TrafficClient(object):
                     retDict[port]['rx'][key] = int(stats[port]['rx'][key])
                 except ValueError:
                     retDict[port]['rx'][key] = 0
-            retDict[port]['rx']['avg_delay_usec'] = float(stats[port]['rx']['avg_delay_usec'])
-            retDict[port]['rx']['min_delay_usec'] = float(stats[port]['rx']['min_delay_usec'])
-            retDict[port]['rx']['max_delay_usec'] = float(stats[port]['rx']['max_delay_usec'])
+            retDict[port]['rx']['avg_delay_usec'] = cast_integer(
+                stats[port]['rx']['avg_delay_usec'])
+            retDict[port]['rx']['min_delay_usec'] = cast_integer(
+                stats[port]['rx']['min_delay_usec'])
+            retDict[port]['rx']['max_delay_usec'] = cast_integer(
+                stats[port]['rx']['max_delay_usec'])
             retDict[port]['drop_rate_percent'] = self.__get_dropped_rate(retDict[port])
 
         ports = sorted(retDict.keys())
@@ -573,7 +643,7 @@ class TrafficClient(object):
         self.interval_collector.add_ndr_pdr(tag, last_stats)
 
     def __format_output_stats(self, stats):
-        for key in (self.PORTS + ['overall']):
+        for key in self.PORTS + ['overall']:
             interface = stats[key]
             stats[key] = {
                 'tx_pkts': interface['tx']['total_pkts'],
@@ -589,7 +659,7 @@ class TrafficClient(object):
 
     def __targets_found(self, rate, targets, results):
         for tag, target in targets.iteritems():
-            LOG.info('Found {} ({}) load: {}'.format(tag, target, rate))
+            LOG.info('Found %s (%s) load: %s', tag, target, rate)
             self.__ndr_pdr_found(tag, rate)
             results[tag]['timestamp_sec'] = time.time()
 
@@ -600,12 +670,13 @@ class TrafficClient(object):
                 indicating the rate to send on each interface
         right   the right side of the range to search as a % of line rate
                 indicating the rate to send on each interface
-        targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag" ('ndr', 'pdr')
+        targets a dict of drop rates to search (0.1 = 0.1%), indexed by the DR name or "tag"
+                ('ndr', 'pdr')
         results a dict to store results
         '''
-        if len(targets) == 0:
+        if not targets:
             return
-        LOG.info('Range search [{} .. {}] targets: {}'.format(left, right, targets))
+        LOG.info('Range search [%s .. %s] targets: %s', left, right, targets)
 
         # Terminate search when gap is less than load epsilon
         if right - left < self.config.measurement.load_epsilon:
@@ -614,8 +685,12 @@ class TrafficClient(object):
 
         # Obtain the average drop rate in for middle load
         middle = (left + right) / 2.0
-        stats, rates = self.__run_search_iteration(middle)
-
+        try:
+            stats, rates = self.__run_search_iteration(middle)
+        except STLError:
+            LOG.exception("Got exception from traffic generator during binary search")
+            self.__targets_found(left, targets, results)
+            return
         # Split target dicts based on the avg drop rate
         left_targets = {}
         right_targets = {}
@@ -630,6 +705,21 @@ class TrafficClient(object):
                 })
                 right_targets[tag] = target
             else:
+                # initialize to 0 all fields of result for
+                # the worst case scenario of the binary search (if ndr/pdr is not found)
+                if tag not in results:
+                    results[tag] = dict.fromkeys(rates, 0)
+                    empty_stats = self.__format_output_stats(dict(stats))
+                    for key in empty_stats:
+                        if isinstance(empty_stats[key], dict):
+                            empty_stats[key] = dict.fromkeys(empty_stats[key], 0)
+                        else:
+                            empty_stats[key] = 0
+                    results[tag].update({
+                        'load_percent_per_direction': 0,
+                        'stats': empty_stats,
+                        'timestamp_sec': None
+                    })
                 left_targets[tag] = target
 
         # search lower half
@@ -665,7 +755,7 @@ class TrafficClient(object):
 
         # save reliable stats from whole iteration
         self.iteration_collector.add(stats, current_traffic_config['direction-total']['rate_pps'])
-        LOG.info('Average drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+        LOG.info('Average drop rate: %f', stats['overall']['drop_rate_percent'])
 
         return stats, current_traffic_config['direction-total']
 
@@ -693,7 +783,7 @@ class TrafficClient(object):
             if stats is None:
                 return
         self.log_stats(stats)
-        LOG.info('Drop rate: {}'.format(stats['overall']['drop_rate_percent']))
+        LOG.info('Drop rate: %f', stats['overall']['drop_rate_percent'])
         yield stats
 
     def cancel_traffic(self):
@@ -727,7 +817,7 @@ class TrafficClient(object):
         config['direction-total'] = dict(config['direction-forward'])
         config['direction-total'].update({
             'rate_percent': load_total,
-            'rate_pps': pps_total,
+            'rate_pps': cast_integer(pps_total),
             'rate_bps': bps_total
         })
 
@@ -749,7 +839,8 @@ class TrafficClient(object):
         for direction in ['orig', 'tx', 'rx']:
             total[direction] = {}
             for unit in ['rate_percent', 'rate_bps', 'rate_pps']:
-                total[direction][unit] = sum(map(lambda x: float(x[direction][unit]), r.values()))
+
+                total[direction][unit] = sum([float(x[direction][unit]) for x in r.values()])
 
         r['direction-total'] = total
         return r