'binding:vnic_type': vnic_type
}
}
+ subnet_id = chain_network.get_subnet_uuid()
+ if subnet_id:
+ body['port']['fixed_ips'] = [{'subnet_id': subnet_id}]
+
port = self.manager.neutron_client.create_port(body)
self.port = port['port']
LOG.info('Created port %s', name)
self.name = self.name + suffix
self.segmentation_id = self._get_item(network_config.segmentation_id,
chain_id, auto_index=True)
+ self.subnet_name = self._get_item(network_config.subnet, chain_id)
self.physical_network = self._get_item(network_config.physical_network, chain_id)
self.reuse = False
self.network = None
self.vlan = None
+ self.router_name = None
if manager.config.l3_router and hasattr(network_config, 'router_name'):
self.router_name = network_config.router_name
try:
return item_field[index]
except IndexError:
raise ChainException("List %s is too short for chain index %d" %
- (str(item_field), index))
+ (str(item_field), index)) from IndexError
# single value is configured
if auto_index:
return item_field + index
"""
return self.network['id']
+ def get_subnet_uuid(self):
+ """
+ Extract UUID of this subnet network.
+
+ :return: UUID of this subnet network
+ """
+ for subnet in self.network['subnets']:
+ if self.subnet_name == self.manager.neutron_client \
+ .show_subnet(subnet)['subnet']['name']:
+ return subnet
+ return None
+
def get_vlan(self):
"""
Extract vlan for this network.
return self.network['provider:segmentation_id']
+ def get_mpls_inner_label(self):
+ """
+ Extract MPLS VPN Label for this network.
+
+ :return: MPLS VPN Label for this network
+ """
+
+ return self.network['provider:segmentation_id']
+
def delete(self):
"""Delete this network."""
if not self.reuse and self.network:
else:
tg_gateway1_ip = devices[LEFT].tg_gateway_ip_addrs
tg_gateway2_ip = devices[RIGHT].tg_gateway_ip_addrs
- tg_mac1 = remote_mac_pair[0]
- tg_mac2 = remote_mac_pair[1]
+ if not config.loop_vm_arp:
+ tg_mac1 = remote_mac_pair[0]
+ tg_mac2 = remote_mac_pair[1]
+ else:
+ tg_mac1 = ""
+ tg_mac2 = ""
g1cidr = devices[LEFT].get_gw_ip(
self.chain.chain_id) + self.__get_network_mask(
if port_index:
# this will pick the last item in array
port_index = -1
- return self.networks[port_index].get_vlan()
+ # This string filters networks connected to TG, in case of
+ # l3-router feature we have 4 networks instead of 2
+ networks = [x for x in self.networks if not x.router_name]
+ return networks[port_index].get_vlan()
def get_vxlan(self, port_index):
"""Get the VXLAN id on a given port.
port_index = -1
return self.networks[port_index].get_vxlan()
+ def get_mpls_inner_label(self, port_index):
+ """Get the MPLS VPN Label on a given port.
+
+ port_index: left port is 0, right port is 1
+ return: the mpls_label_id or None if there is no mpls
+ """
+ # for port 1 we need to return the MPLS Label of the last network in the chain
+ # The networks array contains 2 networks for PVP [left, right]
+ # and 3 networks in the case of PVVP [left.middle,right]
+ if port_index:
+ # this will pick the last item in array
+ port_index = -1
+ return self.networks[port_index].get_mpls_inner_label()
+
def get_dest_mac(self, port_index):
"""Get the dest MAC on a given port.
self.vlans = [self._check_list('vlans[0]', self.config.vlans[0], re_vlan),
self._check_list('vlans[1]', self.config.vlans[1], re_vlan)]
except IndexError:
- raise ChainException('vlans parameter is mandatory. Set valid value in config file')
+ raise ChainException(
+ 'vlans parameter is mandatory. Set valid value in config file') from IndexError
def _get_dest_macs_from_config(self):
re_mac = "[0-9a-fA-F]{2}([-:])[0-9a-fA-F]{2}(\\1[0-9a-fA-F]{2}){4}$"
# if it is a single int or mac, make it a list of 1 int
if isinstance(ll, (int, str)):
ll = [ll]
+ else:
+ ll = list(ll)
for item in ll:
if not re.match(pattern, str(item)):
raise ChainException("Invalid format '{item}' specified in {fname}"
for chain in self.chains:
instances.extend(chain.get_instances())
initial_instance_count = len(instances)
- # Give additional 10 seconds per VM
max_retries = (self.config.check_traffic_time_sec + (initial_instance_count - 1) * 10 +
self.config.generic_poll_sec - 1) / self.config.generic_poll_sec
retry = 0
lookup_only = True
ext_net = self.config.external_networks
net_cfg = [AttrDict({'name': name,
+ 'subnet': None,
'segmentation_id': None,
'physical_network': None})
for name in [ext_net.left, ext_net.right]]
# no openstack
raise ChainException('VxLAN is only supported with OpenStack and with admin user')
+ def get_chain_mpls_inner_labels(self, port_index):
+ """Get the list of per chain MPLS VPN Labels on a given port.
+
+ port_index: left port is 0, right port is 1
+ return: a MPLSs ID list indexed by the chain index or None if no mpls
+ """
+ if self.chains and self.is_admin:
+ return [self.chains[chain_index].get_mpls_inner_label(port_index)
+ for chain_index in range(self.chain_count)]
+ # no openstack
+ raise ChainException('MPLS is only supported with OpenStack and with admin user')
+
def get_dest_macs(self, port_index):
"""Get the list of per chain dest MACs on a given port.