return separator.join('{:02x}'.format(octet) for octet in address.packed)
+def get_mask_from_ip_range(ip_low, ip_high):
+ _ip_low = ipaddress.ip_address(ip_low)
+ _ip_high = ipaddress.ip_address(ip_high)
+ _ip_low_int = int(_ip_low)
+ _ip_high_int = int(_ip_high)
+ return _ip_high.max_prefixlen - (_ip_high_int ^ _ip_low_int).bit_length()
+
+
def try_int(s, *args):
"""Convert to integer if possible."""
try:
PROTO_TCP = 'tcp'
PROTO_VLAN = 'vlan'
-IP_VERSION_4_MASK = '0.0.0.255'
-IP_VERSION_6_MASK = '0:0:0:0:0:0:0:ff'
+IP_VERSION_4_MASK = 24
+IP_VERSION_6_MASK = 64
TRAFFIC_STATUS_STARTED = 'started'
TRAFFIC_STATUS_STOPPED = 'stopped'
raise exceptions.IxNetworkFlowNotPresent(flow_group=fg_id)
count = traffic_param['outer_l3']['count']
- srcip4 = str(traffic_param['outer_l3']['srcip4'])
- dstip4 = str(traffic_param['outer_l3']['dstip4'])
+ srcip = str(traffic_param['outer_l3']['srcip'])
+ dstip = str(traffic_param['outer_l3']['dstip'])
+ srcmask = traffic_param['outer_l3']['srcmask'] or IP_VERSION_4_MASK
+ dstmask = traffic_param['outer_l3']['dstmask'] or IP_VERSION_4_MASK
self._update_ipv4_address(
self._get_stack_item(fg_id, PROTO_IPV4)[0],
- 'srcIp', srcip4, 1, IP_VERSION_4_MASK, count)
+ 'srcIp', srcip, 1, srcmask, count)
self._update_ipv4_address(
self._get_stack_item(fg_id, PROTO_IPV4)[0],
- 'dstIp', dstip4, 1, IP_VERSION_4_MASK, count)
+ 'dstIp', dstip, 1, dstmask, count)
def _build_stats_map(self, view_obj, name_map):
return {data_yardstick: self.ixnet.execute(
import logging
+from yardstick.common import utils
from yardstick.network_services.traffic_profile import base as tp_base
from yardstick.network_services.traffic_profile import trex_traffic_profile
self.rate = self.config.frame_rate
self.rate_unit = self.config.rate_unit
+ def _get_ip_and_mask(self, ip_range):
+ _ip_range = ip_range.split('-')
+ if len(_ip_range) == 1:
+ return _ip_range[0], None
+
+ mask = utils.get_mask_from_ip_range(_ip_range[0], _ip_range[1])
+ return _ip_range[0], mask
+
def _get_ixia_traffic_profile(self, profile_data, mac=None):
mac = {} if mac is None else mac
result = {}
port_id = value.get('id', 1)
port_index = port_id - 1
- try:
- ip = value['outer_l3v6']
- except KeyError:
+
+ if value.get('outer_l3v4'):
ip = value['outer_l3v4']
src_key, dst_key = 'srcip4', 'dstip4'
else:
+ ip = value['outer_l3v6']
src_key, dst_key = 'srcip6', 'dstip6'
+ srcip, srcmask = self._get_ip_and_mask(ip[src_key])
+ dstip, dstmask = self._get_ip_and_mask(ip[dst_key])
+
result[traffickey] = {
'bidir': False,
'id': port_id,
'count': ip['count'],
'dscp': ip['dscp'],
'ttl': ip['ttl'],
- src_key: ip[src_key].split("-")[0],
- dst_key: ip[dst_key].split("-")[0],
+ 'srcip': srcip,
+ 'dstip': dstip,
+ 'srcmask': srcmask,
+ 'dstmask': dstmask,
'type': key,
'proto': ip['proto'],
},
for value in chain(value_iter, self.INVALID_IP_ADDRESS_STR_LIST):
self.assertEqual(utils.ip_to_hex(value), value)
+ def test_get_mask_from_ip_range_ipv4(self):
+ ip_str = '1.1.1.1'
+ for mask in range(8, 30):
+ ip = ipaddress.ip_network(ip_str + '/' + str(mask), strict=False)
+ result = utils.get_mask_from_ip_range(ip[2], ip[-2])
+ self.assertEqual(mask, result)
+
+ def test_get_mask_from_ip_range_ipv6(self):
+ ip_str = '2001::1'
+ for mask in range(8, 120):
+ ip = ipaddress.ip_network(ip_str + '/' + str(mask), strict=False)
+ result = utils.get_mask_from_ip_range(ip[2], ip[-2])
+ self.assertEqual(mask, result)
+
class SafeDecodeUtf8TestCase(ut_base.BaseUnitTestCase):
'outer_l3': {
'count': 512,
'dscp': 0,
- 'dstip4': '152.16.40.20',
'proto': 'udp',
- 'srcip4': '152.16.100.20',
- 'ttl': 32
- },
- 'outer_l3v4': {
- 'dscp': 0,
- 'dstip4': '152.16.40.20',
- 'proto': 'udp',
- 'srcip4': '152.16.100.20',
- 'ttl': 32
- },
- 'outer_l3v6': {
- 'count': 1024,
- 'dscp': 0,
- 'dstip4': '152.16.100.20',
- 'proto': 'udp',
- 'srcip4': '152.16.40.20',
- 'ttl': 32
+ 'ttl': 32,
+ 'dstip': '152.16.40.20',
+ 'srcip': '152.16.100.20',
+ 'dstmask': 24,
+ 'srcmask': 24
},
'outer_l4': {
'dstport': '2001',
'outer_l3': {
'count': 1024,
'dscp': 0,
- 'dstip4': '152.16.100.20',
- 'proto': 'udp',
- 'srcip4': '152.16.40.20',
- 'ttl': 32
- },
- 'outer_l3v4': {
- 'count': 1024,
- 'dscp': 0,
- 'dstip4': '152.16.100.20',
- 'proto': 'udp',
- 'srcip4': '152.16.40.20',
- 'ttl': 32
- },
- 'outer_l3v6': {
- 'count': 1024,
- 'dscp': 0,
- 'dstip4': '152.16.100.20',
'proto': 'udp',
- 'srcip4': '152.16.40.20',
- 'ttl': 32
+ 'ttl': 32,
+ 'dstip': '2001::10',
+ 'srcip': '2021::10',
+ 'dstmask': 64,
+ 'srcmask': 64
},
'outer_l4': {
'dstport': '1234',
r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(t_profile_data)
self.assertEqual(12345678, r_f_c2544_profile.rate)
+ def test__get_ip_and_mask_range(self):
+ ip_range = '1.2.0.2-1.2.255.254'
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range)
+ self.assertEqual('1.2.0.2', ip)
+ self.assertEqual(16, mask)
+
+ def test__get_ip_and_mask_single(self):
+ ip_range = '192.168.1.10'
+ r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
+ self.TRAFFIC_PROFILE)
+ ip, mask = r_f_c2544_profile._get_ip_and_mask(ip_range)
+ self.assertEqual('192.168.1.10', ip)
+ self.assertIsNone(mask)
+
def test__get_ixia_traffic_profile_default_args(self):
r_f_c2544_profile = ixia_rfc2544.IXIARFC2544Profile(
self.TRAFFIC_PROFILE)