# See the License for the specific language governing permissions and
# limitations under the License.
import enum
+from neutronclient.common.utils import str2bool
from snaps.openstack.utils import keystone_utils, neutron_utils
:param segmentation_id: the id of the segmentation
(this is required when network_type is 'vlan')
:param subnets or subnet_settings: List of SubnetConfig objects.
+ :param mtu: MTU setting (optional)
:return:
"""
self.name = kwargs.get('name')
if kwargs.get('admin_state_up') is not None:
- self.admin_state_up = bool(kwargs['admin_state_up'])
+ self.admin_state_up = str2bool(str(kwargs['admin_state_up']))
else:
self.admin_state_up = True
if kwargs.get('shared') is not None:
- self.shared = bool(kwargs['shared'])
+ self.shared = str2bool(str(kwargs['shared']))
else:
self.shared = None
self.project_name = kwargs.get('project_name')
if kwargs.get('external') is not None:
- self.external = bool(kwargs.get('external'))
+ self.external = str2bool(str(kwargs.get('external')))
else:
self.external = False
if not self.name or len(self.name) < 1:
raise NetworkConfigError('Name required for networks')
+ self.mtu = kwargs.get('mtu')
+
def get_project_id(self, os_creds):
"""
Returns the project ID for a given project_name or None
return self.project_id
else:
if self.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
- project = keystone_utils.get_project(
- keystone=keystone, project_name=self.project_name)
- if project:
- return project.id
+ session = keystone_utils.keystone_session(os_creds)
+ keystone = keystone_utils.keystone_client(os_creds, session)
+ try:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ if project:
+ return project.id
+ finally:
+ keystone_utils.close_session(session)
return None
out['provider:segmentation_id'] = self.segmentation_id
if self.external:
out['router:external'] = self.external
+ if self.mtu:
+ out['mtu'] = self.mtu
return {'network': out}
through authorization policies (optional)
:param start: The start address for the allocation pools (optional)
:param end: The end address for the allocation pools (optional)
- :param gateway_ip: The gateway IP address (optional)
+ :param gateway_ip: The gateway IP address (optional). When not
+ configured, the IP address will be automatically
+ assigned; when 'none', no gateway address will be
+ assigned, else the value must be valid
:param enable_dhcp: Set to true if DHCP is enabled and false if DHCP is
disabled (optional)
:param dns_nameservers: A list of DNS name servers for the subnet.
Specify each name server as an IP address
and separate multiple entries with a space.
For example [8.8.8.7 8.8.8.8]
- (default '8.8.8.8')
+ (default [])
:param host_routes: A list of host route dictionaries for the subnet.
For example:
"host_routes":[
if 'dns_nameservers' in kwargs:
self.dns_nameservers = kwargs.get('dns_nameservers')
else:
- if self.ip_version == 4:
- self.dns_nameservers = ['8.8.8.8']
- else:
- self.dns_nameservers = list()
+ self.dns_nameservers = list()
self.host_routes = kwargs.get('host_routes')
self.destination = kwargs.get('destination')
if self.name:
out['name'] = self.name
if self.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
- project = keystone_utils.get_project(
- keystone=keystone, project_name=self.project_name)
+ session = keystone_utils.keystone_session(os_creds)
+ keystone = keystone_utils.keystone_client(os_creds, session)
+ try:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ finally:
+ keystone_utils.close_session(session)
project_id = None
if project:
project_id = project.id
if self.start and self.end:
out['allocation_pools'] = [{'start': self.start, 'end': self.end}]
if self.gateway_ip:
- out['gateway_ip'] = self.gateway_ip
+ if self.gateway_ip == 'none':
+ out['gateway_ip'] = None
+ else:
+ out['gateway_ip'] = self.gateway_ip
if self.enable_dhcp is not None:
out['enable_dhcp'] = self.enable_dhcp
if self.dns_nameservers and len(self.dns_nameservers) > 0:
self.fixed_ips. These values will be directly
translated into the fixed_ips dict (optional)
:param security_groups: One or more security group IDs.
+ :param port_security_enabled: When True, security groups will be
+ applied to the port else not
+ (default - True)
:param allowed_address_pairs: A dictionary containing a set of zero or
more allowed address pairs. An address
pair contains an IP address and MAC
For example, a DHCP agent (optional)
:param device_id: The ID of the device that uses this port.
For example, a virtual server (optional)
+ :param extra_dhcp_opts: k/v of options to use with your DHCP (optional)
:return:
"""
if 'port' in kwargs:
self.network_name = kwargs.get('network_name')
if kwargs.get('admin_state_up') is not None:
- self.admin_state_up = bool(kwargs['admin_state_up'])
+ self.admin_state_up = str2bool(str(kwargs['admin_state_up']))
else:
self.admin_state_up = True
self.mac_address = kwargs.get('mac_address')
self.ip_addrs = kwargs.get('ip_addrs')
self.security_groups = kwargs.get('security_groups')
+
+ if kwargs.get('port_security_enabled') is not None:
+ self.port_security_enabled = str2bool(
+ str(kwargs['port_security_enabled']))
+ else:
+ self.port_security_enabled = None
+
self.allowed_address_pairs = kwargs.get('allowed_address_pairs')
self.opt_value = kwargs.get('opt_value')
self.opt_name = kwargs.get('opt_name')
self.device_owner = kwargs.get('device_owner')
self.device_id = kwargs.get('device_id')
+ self.extra_dhcp_opts = kwargs.get('extra_dhcp_opts')
if not self.network_name:
raise PortConfigError(
'The attribute network_name is required')
- def __get_fixed_ips(self, neutron):
+ def __get_fixed_ips(self, neutron, network):
"""
Sets the self.fixed_ips value
:param neutron: the Neutron client
+ :param network: the SNAPS-OO network domain object
:return: None
"""
-
fixed_ips = list()
if self.ip_addrs:
for ip_addr_dict in self.ip_addrs:
subnet = neutron_utils.get_subnet(
- neutron, subnet_name=ip_addr_dict['subnet_name'])
- if subnet and 'ip' in ip_addr_dict:
- fixed_ips.append({'ip_address': ip_addr_dict['ip'],
- 'subnet_id': subnet.id})
+ neutron, network, subnet_name=ip_addr_dict['subnet_name'])
+ if subnet:
+ if 'ip' in ip_addr_dict:
+ fixed_ips.append({'ip_address': ip_addr_dict['ip'],
+ 'subnet_id': subnet.id})
else:
raise PortConfigError(
'Invalid port configuration, subnet does not exist '
TODO - expand automated testing to exercise all parameters
:param neutron: the Neutron client
+ :param keystone: the Keystone client
:param os_creds: the OpenStack credentials
:return: the dictionary object
"""
-
out = dict()
+ session = keystone_utils.keystone_session(os_creds)
+ keystone = keystone_utils.keystone_client(os_creds, session)
- project_id = None
+ project_name = os_creds.project_name
if self.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
- project = keystone_utils.get_project(
- keystone=keystone, project_name=self.project_name)
- if project:
- project_id = project.id
+ project_name = project_name
+ try:
+ network = neutron_utils.get_network(
+ neutron, keystone, network_name=self.network_name)
+ if network and not network.shared:
+ network = neutron_utils.get_network(
+ neutron, keystone, network_name=self.network_name,
+ project_name=project_name)
+ finally:
+ if session:
+ keystone_utils.close_session(session)
- network = neutron_utils.get_network(
- neutron, network_name=self.network_name, project_id=project_id)
if not network:
raise PortConfigError(
- 'Cannot locate network with name - ' + self.network_name)
+ 'Cannot locate network with name - ' + self.network_name
+ + ' in project - ' + str(project_name))
out['network_id'] = network.id
if self.name:
out['name'] = self.name
if self.project_name:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ project_id = None
+ if project:
+ project_id = project.id
if project_id:
out['tenant_id'] = project_id
else:
if self.mac_address:
out['mac_address'] = self.mac_address
- fixed_ips = self.__get_fixed_ips(neutron)
+ fixed_ips = self.__get_fixed_ips(neutron, network)
if fixed_ips and len(fixed_ips) > 0:
out['fixed_ips'] = fixed_ips
if self.security_groups:
- out['security_groups'] = self.security_groups
+ sec_grp_ids = list()
+ for sec_grp_name in self.security_groups:
+ sec_grp = neutron_utils.get_security_group(
+ neutron, keystone, sec_grp_name=sec_grp_name,
+ project_name=self.project_name)
+ if sec_grp:
+ sec_grp_ids.append(sec_grp.id)
+ out['security_groups'] = sec_grp_ids
+ if self.port_security_enabled is not None:
+ out['port_security_enabled'] = self.port_security_enabled
if self.allowed_address_pairs and len(self.allowed_address_pairs) > 0:
out['allowed_address_pairs'] = self.allowed_address_pairs
if self.opt_value:
out['device_owner'] = self.device_owner
if self.device_id:
out['device_id'] = self.device_id
+ if self.extra_dhcp_opts:
+ out['extra_dhcp_opts'] = self.extra_dhcp_opts
return {'port': out}
def __eq__(self, other):