import yaml
import logging
import ipaddress
-from . import ip_utils
-from .common import constants, utils
+from copy import copy
-class NetworkSettings:
+from . import ip_utils
+from .common import utils
+from .common.constants import (
+ CONTROLLER,
+ COMPUTE,
+ ROLES,
+ DOMAIN_NAME,
+ DNS_SERVERS,
+ NTP_SERVER,
+ ADMIN_NETWORK,
+ EXTERNAL_NETWORK,
+ OPNFV_NETWORK_TYPES,
+)
+
+
+class NetworkSettings(dict):
"""
This class parses APEX network settings yaml file into an object. It
generates or detects all missing fields for deployment.
for deploy.sh consumption. This object will later be used directly as
deployment script move to python.
"""
- def __init__(self, filename, network_isolation):
- with open(filename, 'r') as network_settings_file:
- self.settings_obj = yaml.load(network_settings_file)
- self.network_isolation = network_isolation
- self.enabled_network_list = []
- self.nics = {'compute': dict(), 'controller': dict()}
- self.nics_specified = {'compute': False, 'controller': False}
- self._validate_input()
+ def __init__(self, filename):
+ init_dict = {}
+ if type(filename) is str:
+ with open(filename, 'r') as network_settings_file:
+ init_dict = yaml.safe_load(network_settings_file)
+ else:
+ # assume input is a dict to build from
+ init_dict = filename
+ super().__init__(init_dict)
+
+ if 'apex' in self:
+ # merge two dics Nondestructively
+ def merge(pri, sec):
+ for key, val in sec.items():
+ if key in pri:
+ if type(val) is dict:
+ merge(pri[key], val)
+ # else
+ # do not overwrite what's already there
+ else:
+ pri[key] = val
+ # merge the apex specific config into the first class settings
+ merge(self, copy(self['apex']))
+
+ self.enabled_network_list = []
+ self.nics = {COMPUTE: {}, CONTROLLER: {}}
+ self.nics_specified = {COMPUTE: False, CONTROLLER: False}
+ self._validate_input()
+
+ def get_network(self, network):
+ if network == EXTERNAL_NETWORK and self['networks'][network]:
+ return self['networks'][network][0]
+ else:
+ return self['networks'][network]
def _validate_input(self):
"""
NetworkSettingsException will be raised if validation fails.
"""
- if constants.ADMIN_NETWORK not in self.settings_obj or \
- not utils.str2bool(self.settings_obj[constants.ADMIN_NETWORK].get(
- 'enabled')):
- raise NetworkSettingsException("You must enable admin_network "
- "and configure it explicitly or "
- "use auto-detection")
- if self.network_isolation and \
- (constants.PUBLIC_NETWORK not in self.settings_obj or not
- utils.str2bool(self.settings_obj[constants.PUBLIC_NETWORK].get(
- 'enabled'))):
- raise NetworkSettingsException("You must enable public_network "
+ if not self['networks'].get(ADMIN_NETWORK, {}).get('enabled', False):
+ raise NetworkSettingsException("You must enable admin network "
"and configure it explicitly or "
"use auto-detection")
- for network in constants.OPNFV_NETWORK_TYPES:
- if network in self.settings_obj:
- if utils.str2bool(self.settings_obj[network].get('enabled')):
+ for network in OPNFV_NETWORK_TYPES:
+ if network in self['networks']:
+ _network = self.get_network(network)
+ if _network.get('enabled', True):
logging.info("{} enabled".format(network))
self._config_required_settings(network)
+ if network == EXTERNAL_NETWORK:
+ nicmap = _network['nic_mapping']
+ else:
+ nicmap = _network['nic_mapping']
+ iface = nicmap[CONTROLLER]['members'][0]
self._config_ip_range(network=network,
- setting='usable_ip_range',
+ interface=iface,
+ ip_range='usable_ip_range',
start_offset=21, end_offset=21)
- self._config_optional_settings(network)
self.enabled_network_list.append(network)
self._validate_overcloud_nic_order(network)
+ # TODO self._config_optional_settings(network)
else:
logging.info("{} disabled, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
else:
logging.info("{} is not in specified, will collapse with "
- "admin_network".format(network))
+ "admin network".format(network))
- self.settings_obj['dns_servers'] = self.settings_obj.get(
- 'dns_servers', constants.DNS_SERVERS)
- self.settings_obj['domain_name'] = self.settings_obj.get(
- 'domain_name', constants.DOMAIN_NAME)
+ if 'dns-domain' not in self:
+ self['domain_name'] = DOMAIN_NAME
+ self['dns_servers'] = self.get('dns_nameservers', DNS_SERVERS)
+ self['ntp_servers'] = self.get('ntp', NTP_SERVER)
def _validate_overcloud_nic_order(self, network):
"""
for network
If nic order is specified in a network for a profile, it should be
- specified for every network with that profile other than admin_network
+ specified for every network with that profile other than admin network
Duplicate nic names are also not allowed across different networks
:param network: network to detect if nic order present
:return: None
"""
-
- for role in constants.ROLES:
- interface = role+'_interface'
- nic_index = self.get_enabled_networks().index(network) + 1
- if interface in self.settings_obj[network]:
- if any(y == self.settings_obj[network][interface] for x, y in
- self.nics[role].items()):
- raise NetworkSettingsException("Duplicate {} already "
- "specified for "
- "another network"
- .format(self.settings_obj
- [network]
- [interface]))
- self.nics[role][network] = self.settings_obj[network][
- interface]
+ for role in ROLES:
+ _network = self.get_network(network)
+ _nicmap = _network.get('nic_mapping', {})
+ _role = _nicmap.get(role, {})
+ interfaces = _role.get('members', [])
+
+ if interfaces:
+ interface = interfaces[0]
+ if type(_role.get('vlan', 'native')) is not int and \
+ any(y == interface for x, y in self.nics[role].items()):
+ raise NetworkSettingsException(
+ "Duplicate {} already specified for "
+ "another network".format(interface))
+ self.nics[role][network] = interface
self.nics_specified[role] = True
logging.info("{} nic order specified for network {"
"}".format(role, network))
- elif self.nics_specified[role]:
- logging.error("{} nic order not specified for network {"
- "}".format(role, network))
- raise NetworkSettingsException("Must specify {} for all "
- "enabled networks (other than "
- " admin) or not specify it for "
- "any".format(interface))
else:
- logging.info("{} nic order not specified for network {"
- "}. Will use logical default "
- "nic{}".format(interface, network, nic_index))
- self.nics[role][network] = 'nic' + str(nic_index)
- nic_index += 1
+ raise NetworkSettingsException(
+ "Interface members are not supplied for {} network "
+ "for the {} role. Please add nic assignments"
+ "".format(network, role))
def _config_required_settings(self, network):
"""
given NIC in the system. The resulting config in settings object will
be an ipaddress.network object, replacing the NIC name.
"""
+ _network = self.get_network(network)
# if vlan not defined then default it to native
- if network is not constants.ADMIN_NETWORK:
- if 'vlan' not in self.settings_obj[network]:
- self.settings_obj[network]['vlan'] = 'native'
+ if network is not ADMIN_NETWORK:
+ for role in ROLES:
+ if 'vlan' not in _network['nic_mapping'][role]:
+ _network['nic_mapping'][role]['vlan'] = 'native'
- cidr = self.settings_obj[network].get('cidr')
- nic_name = self.settings_obj[network].get('bridged_interface')
+ cidr = _network.get('cidr')
if cidr:
- cidr = ipaddress.ip_network(self.settings_obj[network]['cidr'])
- self.settings_obj[network]['cidr'] = cidr
+ cidr = ipaddress.ip_network(_network['cidr'])
+ _network['cidr'] = cidr
logging.info("{}_cidr: {}".format(network, cidr))
- return 0
- elif nic_name:
+ elif 'installer_vm' in _network:
+ ucloud_if_list = _network['installer_vm']['members']
# If cidr is not specified, we need to know if we should find
# IPv6 or IPv4 address on the interface
- if utils.str2bool(self.settings_obj[network].get('ipv6')):
- address_family = 6
- else:
- address_family = 4
- nic_interface = ip_utils.get_interface(nic_name, address_family)
- if nic_interface:
- self.settings_obj[network]['bridged_interface'] = nic_interface
+ ip = ipaddress.ip_address(_network['installer_vm']['ip'])
+ nic_if = ip_utils.get_interface(ucloud_if_list[0], ip.version)
+ if nic_if:
+ ucloud_if_list = [nic_if]
logging.info("{}_bridged_interface: {}".
- format(network, nic_interface))
- return 0
+ format(network, nic_if))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "Unable to find valid ip for "
- "interface {}"
- .format(network, nic_name))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: Unable to find valid "
+ "ip for interface {}".format(network, ucloud_if_list[0]))
else:
- raise NetworkSettingsException("Auto detection failed for {}: "
- "either bridge_interface or cidr "
- "must be specified"
- .format(network))
+ raise NetworkSettingsException(
+ "Auto detection failed for {}: either installer_vm "
+ "members or cidr must be specified".format(network))
+
+ # undercloud settings
+ if network == ADMIN_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='dhcp_range',
+ start_offset=2, count=9)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='introspection_range',
+ start_offset=11, count=9)
+ elif network == EXTERNAL_NETWORK:
+ provisioner_ip = _network['installer_vm']['ip']
+ iface = _network['installer_vm']['members'][0]
+ if not provisioner_ip:
+ _network['installer_vm']['ip'] = self._gen_ip(network, 1)
+ self._config_ip_range(network=network, interface=iface,
+ ip_range='floating_ip_range',
+ end_offset=2, count=20)
- def _config_ip_range(self, network, setting, start_offset=None,
- end_offset=None, count=None):
+ gateway = _network['gateway']
+ interface = _network['installer_vm']['ip']
+ self._config_gateway(network, gateway, interface)
+
+ def _config_ip_range(self, network, ip_range, interface=None,
+ start_offset=None, end_offset=None, count=None):
"""
Configures IP range for a given setting.
-
If the setting is already specified, no change will be made.
-
The spec for start_offset, end_offset and count are identical to
ip_utils.get_ip_range.
"""
- ip_range = self.settings_obj[network].get(setting)
- interface = self.settings_obj[network].get('bridged_interface')
-
- if not ip_range:
- cidr = self.settings_obj[network].get('cidr')
- ip_range = ip_utils.get_ip_range(start_offset=start_offset,
- end_offset=end_offset,
- count=count,
- cidr=cidr,
- interface=interface)
- self.settings_obj[network][setting] = ip_range
-
- logging.info("{}_{}: {}".format(network, setting, ip_range))
-
- def _config_ip(self, network, setting, offset):
+ _network = self.get_network(network)
+ if ip_range not in _network:
+ cidr = _network.get('cidr')
+ _ip_range = ip_utils.get_ip_range(start_offset=start_offset,
+ end_offset=end_offset,
+ count=count,
+ cidr=cidr,
+ interface=interface)
+ _network[ip_range] = _ip_range.split(',')
+
+ logging.info("Config IP Range: {} {}".format(network, ip_range))
+
+ def _gen_ip(self, network, offset):
"""
- Configures IP for a given setting.
-
- If the setting is already specified, no change will be made.
-
- The spec for offset is identical to ip_utils.get_ip
+ Generate and ip offset within the given network
"""
- ip = self.settings_obj[network].get(setting)
- interface = self.settings_obj[network].get('bridged_interface')
-
- if not ip:
- cidr = self.settings_obj[network].get('cidr')
- ip = ip_utils.get_ip(offset, cidr, interface)
- self.settings_obj[network][setting] = ip
-
- logging.info("{}_{}: {}".format(network, setting, ip))
+ _network = self.get_network(network)
+ cidr = _network.get('cidr')
+ ip = ip_utils.get_ip(offset, cidr)
+ logging.info("Config IP: {} {}".format(network, ip))
+ return ip
def _config_optional_settings(self, network):
"""
- introspection_range
- public_network:
- provisioner_ip
- - floating_ip
+ - floating_ip_range
- gateway
"""
- if network == constants.ADMIN_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
- self._config_ip_range(network=network, setting='dhcp_range',
+ if network == ADMIN_NETWORK:
+ self._config_ip(network, None, 'provisioner_ip', 1)
+ self._config_ip_range(network=network,
+ ip_range='dhcp_range',
start_offset=2, count=9)
self._config_ip_range(network=network,
- setting='introspection_range',
+ ip_range='introspection_range',
start_offset=11, count=9)
- elif network == constants.PUBLIC_NETWORK:
- self._config_ip(network, 'provisioner_ip', 1)
+ elif network == EXTERNAL_NETWORK:
+ self._config_ip(network, None, 'provisioner_ip', 1)
self._config_ip_range(network=network,
- setting='floating_ip',
+ ip_range='floating_ip_range',
end_offset=2, count=20)
self._config_gateway(network)
- def _config_gateway(self, network):
+ def _config_gateway(self, network, gateway, interface):
"""
Configures gateway setting for a given network.
If cidr is specified, we always use the first address in the address
space for gateway. Otherwise, we detect the system gateway.
"""
- gateway = self.settings_obj[network].get('gateway')
- interface = self.settings_obj[network].get('bridged_interface')
-
+ _network = self.get_network(network)
if not gateway:
- cidr = self.settings_obj[network].get('cidr')
+ cidr = _network.get('cidr')
if cidr:
- gateway = ip_utils.get_ip(1, cidr)
+ _gateway = ip_utils.get_ip(1, cidr)
else:
- gateway = ip_utils.find_gateway(interface)
+ _gateway = ip_utils.find_gateway(interface)
- if gateway:
- self.settings_obj[network]['gateway'] = gateway
+ if _gateway:
+ _network['gateway'] = _gateway
else:
raise NetworkSettingsException("Failed to set gateway")
- logging.info("{}_gateway: {}".format(network, gateway))
+ logging.info("Config Gateway: {} {}".format(network, gateway))
def dump_bash(self, path=None):
"""
If optional path is provided, bash string will be written to the file
instead of stdout.
"""
+ def flatten(name, obj, delim=','):
+ """
+ flatten lists to delim separated strings
+ flatten dics to underscored key names and string values
+ """
+ if type(obj) is list:
+ return "{}=\'{}\'\n".format(name,
+ delim.join(map(lambda x: str(x),
+ obj)))
+ elif type(obj) is dict:
+ flat_str = ''
+ for k in obj:
+ flat_str += flatten("{}_{}".format(name, k), obj[k])
+ return flat_str
+ elif type(obj) is str:
+ return "{}='{}'\n".format(name, obj)
+ else:
+ return "{}={}\n".format(name, str(obj))
+
bash_str = ''
for network in self.enabled_network_list:
- for key, value in self.settings_obj[network].items():
- bash_str += "{}_{}={}\n".format(network, key, value)
- bash_str += "enabled_network_list='{}'\n" \
- .format(' '.join(self.enabled_network_list))
- bash_str += "ip_addr_family={}\n".format(self.get_ip_addr_family())
- dns_list = ""
- for dns_server in self.settings_obj['dns_servers']:
- dns_list = dns_list + "{} ".format(dns_server)
- dns_list = dns_list.strip()
- bash_str += "dns_servers=\'{}\'\n".format(dns_list)
- bash_str += "domain_name=\'{}\'\n".format(self.settings_obj[
- 'domain_name'])
+ _network = self.get_network(network)
+ bash_str += flatten(network, _network)
+ bash_str += flatten('enabled_network_list',
+ self.enabled_network_list, ' ')
+ bash_str += flatten('ip_addr_family', self.get_ip_addr_family())
+ bash_str += flatten('dns_servers', self['dns_servers'], ' ')
+ bash_str += flatten('domain_name', self['dns-domain'], ' ')
+ bash_str += flatten('ntp_server', self['ntp_servers'][0], ' ')
if path:
with open(path, 'w') as file:
file.write(bash_str)
else:
print(bash_str)
- def get_ip_addr_family(self):
+ def get_ip_addr_family(self,):
"""
Returns IP address family for current deployment.
If any enabled network has IPv6 CIDR, the deployment is classified as
IPv6.
"""
- for network in self.enabled_network_list:
- cidr = ipaddress.ip_network(self.settings_obj[network]['cidr'])
- if cidr.version == 6:
- return 6
-
- return 4
-
- def get_network_settings(self):
- """
- Getter for network settings
- :return: network settings dictionary
- """
- return self.settings_obj
-
- def get_enabled_networks(self):
- """
- Getter for enabled network list
- :return: list of enabled networks
- """
- return self.enabled_network_list
+ return max([
+ ipaddress.ip_network(self.get_network(n)['cidr']).version
+ for n in self.enabled_network_list])
class NetworkSettingsException(Exception):