import yaml
import logging
import ipaddress
-from . import ip_utils
-from .common import constants, utils
+from copy import copy
-class NetworkSettings:
+from . import ip_utils
+from .common import utils
+from .common.constants import (
+ CONTROLLER,
+ COMPUTE,
+ ROLES,
+ DOMAIN_NAME,
+ DNS_SERVERS,
+ NTP_SERVER,
+ ADMIN_NETWORK,
+ EXTERNAL_NETWORK,
+ OPNFV_NETWORK_TYPES,
+)
+
+
+class NetworkSettings(dict):
"""
This class parses APEX network settings yaml file into an object. It
generates or detects all missing fields for deployment.
- The resulting object will be used later to generate network environment file
- as well as configuring post deployment networks.
+ The resulting object will be used later to generate network environment
+ file as well as configuring post deployment networks.
Currently the parsed object is dumped into a bash global definition file
for deploy.sh consumption. This object will later be used directly as
deployment script move to python.
"""
- def __init__(self, filename, network_isolation):
- with open(filename, 'r') as network_settings_file:
- self.settings_obj = yaml.load(network_settings_file)
- self.network_isolation = network_isolation
- self.enabled_network_list = []
- self._validate_input()
+ def __init__(self, filename):
+ init_dict = {}
+ if type(filename) is str:
+ with open(filename, 'r') as network_settings_file:
+ init_dict = yaml.safe_load(network_settings_file)
+ else:
+ # assume input is a dict to build from
+ init_dict = filename
+ super().__init__(init_dict)
+
+ if 'apex' in self:
+ # merge two dics Nondestructively
+ def merge(pri, sec):
+ for key, val in sec.items():
+ if key in pri:
+ if type(val) is dict:
+ merge(pri[key], val)
+ # else
+ # do not overwrite what's already there
+ else:
+ pri[key] = val
+ # merge the apex specific config into the first class settings
+ merge(self, copy(self['apex']))
+
+ self.enabled_network_list = []
+ self.nics = {COMPUTE: {}, CONTROLLER: {}}
+ self.nics_specified = {COMPUTE: False, CONTROLLER: False}
+ self._validate_input()
+
+ def get_network(self, network):
+ if network == EXTERNAL_NETWORK and self['networks'][network]:
+ return self['networks'][network][0]
+ else:
+ return self['networks'][network]
def _validate_input(self):
"""
NetworkSettingsException will be raised if validation fails.
"""
- if constants.ADMIN_NETWORK not in self.settings_obj or \
- not utils.str2bool(self.settings_obj[constants.ADMIN_NETWORK].get(
- 'enabled')):
- raise NetworkSettingsException("You must enable admin_network "
- "and configure it explicitly or "
- "use auto-detection")
- if self.network_isolation and \
- (constants.PUBLIC_NETWORK not in self.settings_obj or not
- utils.str2bool(self.settings_obj[constants.PUBLIC_NETWORK].get(
- 'enabled'))):
- raise NetworkSettingsException("You must enable public_network "
+ if not self['networks'].get(ADMIN_NETWORK, {}).get('enabled', False):
+ raise NetworkSettingsException("You must enable admin network "
"and configure it explicitly or "
"use auto-detection")
- for network in constants.OPNFV_NETWORK_TYPES:
- if network in self.settings_obj:
- if utils.str2bool(self.settings_obj[network].get('enabled')):
+ for network in OPNFV_NETWORK_TYPES:
+ if network in self['networks']:
+ _network = self.get_network(network)
+ if _network.get('enabled', True):
logging.info("{} enabled".format(network))
self._config_required_settings(network)
+ if network == EXTERNAL_NETWORK:
+ nicmap = _network['nic_mapping']
+ else:
+ nicmap = _network['nic_mapping']
+ iface = nicmap[CONTROLLER]['members'][0]
self._config_ip_range(network=network,
- setting='usable_ip_range',
+ interface=iface,
+ ip_range='usable_ip_range',
start_offset=21, end_offset=21)
- self._config_optional_settings(network)
self.enabled_network_list.append(network)
+ self._validate_overcloud_nic_order(network)
+ # TODO self._config_optional_settings(network)
else:
logging.info("{} disabled, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
else:
logging.info("{} is not in specified, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
+
+ if 'dns-domain' not in self:
+ self['domain_name'] = DOMAIN_NAME
+ self['dns_servers'] = self.get('dns_nameservers', DNS_SERVERS)
+ self['ntp_servers'] = self.get('ntp', NTP_SERVER)
+
+ def _validate_overcloud_nic_order(self, network):
+ """
+ Detects if nic order is specified per profile (compute/controller)
+ for network
+
+ If nic order is specified in a network for a profile, it should be
+ specified for every network with that profile other than admin network
+
+ Duplicate nic names are also not allowed across different networks
+
+ :param network: network to detect if nic order present
+ :return: None
+ """
+ for role in ROLES:
+ _network = self.get_network(network)
+ _nicmap = _network.get('nic_mapping', {})
+ _role = _nicmap.get(role, {})
+ interfaces = _role.get('members', [])
+
+ if interfaces:
+ interface = interfaces[0]
+ if type(_role.get('vlan', 'native')) is not int and \
+ any(y == interface for x, y in self.nics[role].items()):
+ raise NetworkSettingsException(
+ "Duplicate {} already specified for "
+ "another network".format(interface))
+ self.nics[role][network] = interface
+ self.nics_specified[role] = True
+ logging.info("{} nic order specified for network {"
+ "}".format(role, network))
+ else:
+ raise NetworkSettingsException(
+ "Interface members are not supplied for {} network "
+ "for the {} role. Please add nic assignments"
+ "".format(network, role))
def _config_required_settings(self, network):
"""
given NIC in the system. The resulting config in settings object will
be an ipaddress.network object, replacing the NIC name.
"""
- cidr = self.settings_obj[network].get('cidr')
- nic_name = self.settings_obj[network].get('bridged_interface')
+ _network = self.get_network(network)
+ # if vlan not defined then default it to native
+ if network is not ADMIN_NETWORK:
+ for role in ROLES:
+ if 'vlan' not in _network['nic_mapping'][role]:
+ _network['nic_mapping'][role]['vlan'] = 'native'
+
+ cidr = _network.get('cidr')
if cidr:
- cidr = ipaddress.ip_network(self.settings_obj[network]['cidr'])
- self.settings_obj[network]['cidr'] = cidr
+ cidr = ipaddress.ip_network(_network['cidr'])
+ _network['cidr'] = cidr
logging.info("{}_cidr: {}".format(network, cidr))
- return 0
- elif nic_name:
+ elif 'installer_vm' in _network:
+ ucloud_if_list = _network['installer_vm']['members']
# If cidr is not specified, we need to know if we should find
# IPv6 or IPv4 address on the interface
- if utils.str2bool(self.settings_obj[network].get('ipv6')):
- address_family = 6
- else:
- address_family = 4
- nic_interface = ip_utils.get_interface(nic_name, address_family)
- if nic_interface:
- self.settings_obj[network]['bridged_interface'] = nic_interface
+ ip = ipaddress.ip_address(_network['installer_vm']['ip'])
+ nic_if = ip_utils.get_interface(ucloud_if_list[0], ip.version)
+ if nic_if:
+ ucloud_if_list = [nic_if]
logging.info("{}_bridged_interface: {}".
- format(network, nic_interface))
- return 0
+ format(network, nic_if))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "Unable to find valid ip for "
- "interface {}"
- .format(network, nic_name))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: Unable to find valid "
+ "ip for interface {}".format(network, ucloud_if_list[0]))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "either bridge_interface or cidr "
- "must be specified"
- .format(network))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: either installer_vm "
+ "members or cidr must be specified".format(network))
+
+ # undercloud settings
+ if network == ADMIN_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='dhcp_range',
+ start_offset=2, count=9)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='introspection_range',
+ start_offset=11, count=9)
+ elif network == EXTERNAL_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='floating_ip_range',
+ end_offset=2, count=20)
+
+ gateway = _network['gateway']
+ interface = _network['installer_vm']['ip']
+ self._config_gateway(network, gateway, interface)
- def _config_ip_range(self, network, setting, start_offset=None,
- end_offset=None, count=None):
+ def _config_ip_range(self, network, ip_range, interface=None,
+ start_offset=None, end_offset=None, count=None):
"""
Configures IP range for a given setting.
-
If the setting is already specified, no change will be made.
-
The spec for start_offset, end_offset and count are identical to
ip_utils.get_ip_range.
"""
- ip_range = self.settings_obj[network].get(setting)
- interface = self.settings_obj[network].get('bridged_interface')
-
- if not ip_range:
- cidr = self.settings_obj[network].get('cidr')
- ip_range = ip_utils.get_ip_range(start_offset=start_offset,
- end_offset=end_offset,
- count=count,
- cidr=cidr,
- interface=interface)
- self.settings_obj[network][setting] = ip_range
-
- logging.info("{}_{}: {}".format(network, setting, ip_range))
-
- def _config_ip(self, network, setting, offset):
+ _network = self.get_network(network)
+ if ip_range not in _network:
+ cidr = _network.get('cidr')
+ _ip_range = ip_utils.get_ip_range(start_offset=start_offset,
+ end_offset=end_offset,
+ count=count,
+ cidr=cidr,
+ interface=interface)
+ _network[ip_range] = _ip_range.split(',')
+
+ logging.info("Config IP Range: {} {}".format(network, ip_range))
+
+ def _gen_ip(self, network, offset):
"""
- Configures IP for a given setting.
-
- If the setting is already specified, no change will be made.
-
- The spec for offset is identical to ip_utils.get_ip
+ Generate and ip offset within the given network
"""
- ip = self.settings_obj[network].get(setting)
- interface = self.settings_obj[network].get('bridged_interface')
-
- if not ip:
- cidr = self.settings_obj[network].get('cidr')
- ip = ip_utils.get_ip(offset, cidr, interface)
- self.settings_obj[network][setting] = ip
-
- logging.info("{}_{}: {}".format(network, setting, ip))
+ _network = self.get_network(network)
+ cidr = _network.get('cidr')
+ ip = ip_utils.get_ip(offset, cidr)
+ logging.info("Config IP: {} {}".format(network, ip))
+ return ip
def _config_optional_settings(self, network):
"""
- introspection_range
- public_network:
- provisioner_ip
- - floating_ip
+ - floating_ip_range
- gateway
"""
- if network == constants.ADMIN_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
- self._config_ip_range(network=network, setting='dhcp_range',
+ if network == ADMIN_NETWORK:
+ self._config_ip(network, None, 'provisioner_ip', 1)
+ self._config_ip_range(network=network,
+ ip_range='dhcp_range',
start_offset=2, count=9)
self._config_ip_range(network=network,
- setting='introspection_range',
+ ip_range='introspection_range',
start_offset=11, count=9)
- elif network == constants.PUBLIC_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
+ elif network == EXTERNAL_NETWORK:
+ self._config_ip(network, None, 'provisioner_ip', 1)
self._config_ip_range(network=network,
- setting='floating_ip',
+ ip_range='floating_ip_range',
end_offset=2, count=20)
self._config_gateway(network)
- def _config_gateway(self, network):
+ def _config_gateway(self, network, gateway, interface):
"""
Configures gateway setting for a given network.
If cidr is specified, we always use the first address in the address
space for gateway. Otherwise, we detect the system gateway.
"""
- gateway = self.settings_obj[network].get('gateway')
- interface = self.settings_obj[network].get('bridged_interface')
-
+ _network = self.get_network(network)
if not gateway:
- cidr = self.settings_obj[network].get('cidr')
+ cidr = _network.get('cidr')
if cidr:
- gateway = ip_utils.get_ip(1, cidr)
+ _gateway = ip_utils.get_ip(1, cidr)
else:
- gateway = ip_utils.find_gateway(interface)
+ _gateway = ip_utils.find_gateway(interface)
- if gateway:
- self.settings_obj[network]['gateway'] = gateway
+ if _gateway:
+ _network['gateway'] = _gateway
else:
raise NetworkSettingsException("Failed to set gateway")
- logging.info("{}_gateway: {}".format(network, gateway))
+ logging.info("Config Gateway: {} {}".format(network, gateway))
def dump_bash(self, path=None):
"""
If optional path is provided, bash string will be written to the file
instead of stdout.
"""
+ def flatten(name, obj, delim=','):
+ """
+ flatten lists to delim separated strings
+ flatten dics to underscored key names and string values
+ """
+ if type(obj) is list:
+ return "{}=\'{}\'\n".format(name,
+ delim.join(map(lambda x: str(x),
+ obj)))
+ elif type(obj) is dict:
+ flat_str = ''
+ for k in obj:
+ flat_str += flatten("{}_{}".format(name, k), obj[k])
+ return flat_str
+ elif type(obj) is str:
+ return "{}='{}'\n".format(name, obj)
+ else:
+ return "{}={}\n".format(name, str(obj))
+
bash_str = ''
for network in self.enabled_network_list:
- for key, value in self.settings_obj[network].items():
- bash_str += "{}_{}={}\n".format(network, key, value)
- bash_str += "enabled_network_list='{}'\n" \
- .format(' '.join(self.enabled_network_list))
- bash_str += "ip_addr_family={}\n".format(self.get_ip_addr_family())
+ _network = self.get_network(network)
+ bash_str += flatten(network, _network)
+ bash_str += flatten('enabled_network_list',
+ self.enabled_network_list, ' ')
+ bash_str += flatten('ip_addr_family', self.get_ip_addr_family())
+ bash_str += flatten('dns_servers', self['dns_servers'], ' ')
+ bash_str += flatten('domain_name', self['dns-domain'], ' ')
+ bash_str += flatten('ntp_server', self['ntp_servers'][0], ' ')
if path:
with open(path, 'w') as file:
file.write(bash_str)
else:
print(bash_str)
- def get_ip_addr_family(self):
+ def get_ip_addr_family(self,):
"""
Returns IP address family for current deployment.
If any enabled network has IPv6 CIDR, the deployment is classified as
IPv6.
"""
- for network in self.enabled_network_list:
- cidr = ipaddress.ip_network(self.settings_obj[network]['cidr'])
- if cidr.version == 6:
- return 6
-
- return 4
-
- def get_network_settings(self):
- """
- Getter for network settings
- :return: network settings dictionary
- """
- return self.settings_obj
-
- def get_enabled_networks(self):
- """
- Getter for enabled network list
- :return: list of enabled networks
- """
- return self.enabled_network_list
+ return max([
+ ipaddress.ip_network(self.get_network(n)['cidr']).version
+ for n in self.enabled_network_list])
class NetworkSettingsException(Exception):
def __str__(self):
return self.value
-
-
-