From: Guillermo Herrero Date: Wed, 31 Jan 2018 15:39:02 +0000 (+0100) Subject: generate_config: Move filters to external library X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=commitdiff_plain;h=d085c0ec73dfd804f226fb383509f6592c36236b;p=pharos.git generate_config: Move filters to external library Clean and isolate main script logic from custom filters by moving all them to a external library file. Library has a function to load all the existing filters at once. New enhanced IP handling custom filters: - ipnet_hostaddr - ipnet_hostmin - ipnet_hostmax - ipnet_broadcast - ipnet_netmask - ipnet_contains_ip - ipnet_contains_iprange - ipnet_range_size ipnet filters work with proper IP network with prefixlen mask, ensuring consistent IP calculations and error handling. Previous IP handling filters should be deprecated. Change-Id: I83c41d7ad3c6bd1d9df1deca6cc5b9d2481ecf52 Signed-off-by: Guillermo Herrero --- diff --git a/config/utils/gen_config_lib.py b/config/utils/gen_config_lib.py new file mode 100644 index 00000000..e75387fb --- /dev/null +++ b/config/utils/gen_config_lib.py @@ -0,0 +1,205 @@ +############################################################################## +# Copyright (c) 2018 OPNFV and others. +# +# All rights reserved. This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# http://www.apache.org/licenses/LICENSE-2.0 +############################################################################## +"""Library for generate_config functions and custom jinja2 filters""" + +import logging +from ipaddress import IPv4Network, IPv4Address + + +def load_custom_filters(environment): + """Load all defined filters into the jinja2 enviroment""" + + # TODO deprecate ipaddr_index and netmask for the better ipnet ones + filter_list = { + 'dpkg_arch': filter_dpkg_arch, + 'ipnet_hostaddr': filter_ipnet_hostaddr, + 'ipnet_hostmin': filter_ipnet_hostmin, + 'ipnet_hostmax': filter_ipnet_hostmax, + 'ipnet_broadcast': filter_ipnet_broadcast, + 'ipnet_netmask': filter_ipnet_netmask, + 'ipnet_contains_ip': filter_ipnet_contains_ip, + 'ipnet_contains_iprange': filter_ipnet_contains_iprange, + 'ipnet_range_size': filter_ipnet_range_size, + 'ipaddr_index': filter_ipaddr_index, + 'netmask': filter_netmask + } + + for name, function in filter_list.items(): + environment.filters[name] = function + + +def filter_dpkg_arch(arch, to_dpkg=True): + """Convert DPKG-compatible from processor arch and vice-versa""" + + # Processor architecture (as reported by $(uname -m)) + # vs DPKG architecture mapping + dpkg_arch_table = { + 'aarch64': 'arm64', + 'x86_64': 'amd64', + } + arch_dpkg_table = dict( + zip(dpkg_arch_table.values(), dpkg_arch_table.keys())) + + if to_dpkg: + return dpkg_arch_table[arch] + else: + return arch_dpkg_table[arch] + + +def filter_ipnet_hostaddr(network_cidr, index): + """Return the host IP address on given index from an IP network""" + try: + network_cidr_str = unicode(network_cidr) + except NameError as ex: + network_cidr_str = str(network_cidr) + try: + return IPv4Network(network_cidr_str)[index] + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + except IndexError as ex: + logging.error(network_cidr_str + " has not enough range for " + + str(index) + " host IPs.") + raise + + +def filter_ipnet_broadcast(network_cidr): + """Return broadcast IP address from given IP network""" + try: + network_cidr_str = unicode(network_cidr) + except NameError as ex: + network_cidr_str = str(network_cidr) + try: + return IPv4Network(network_cidr_str).broadcast_address + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_hostmin(network_cidr): + """Return the first host IP address from given IP network""" + try: + network_cidr_str = unicode(network_cidr) + except NameError as ex: + network_cidr_str = str(network_cidr) + try: + return IPv4Network(network_cidr_str)[1] + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_hostmax(network_cidr): + """Return the last host IP address from given IP network""" + try: + network_cidr_str = unicode(network_cidr) + except NameError as ex: + network_cidr_str = str(network_cidr) + try: + return IPv4Network(network_cidr_str)[-2] + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_netmask(network_cidr): + """Return the IP netmask from given IP network""" + try: + network_cidr_str = unicode(network_cidr) + except NameError as ex: + network_cidr_str = str(network_cidr) + try: + return IPv4Network(network_cidr_str).netmask + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_contains_ip(network_cidr, ip_address): + """Check if an IP network cointains a given range""" + try: + network_cidr_str = unicode(network_cidr) + ip_address_str = unicode(ip_address) + except NameError as ex: + network_cidr_str = str(network_cidr) + ip_address_str = str(ip_address) + try: + return IPv4Address(ip_address_str) in IPv4Network(network_cidr_str) + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_contains_iprange(network_cidr, range_start, range_end): + """Check if an IP network cointains a given range""" + try: + network_cidr_str = unicode(network_cidr) + range_start_str = unicode(range_start) + range_end_str = unicode(range_end) + except NameError as ex: + network_cidr_str = str(network_cidr) + range_start_str = str(range_start) + range_end_str = str(range_end) + try: + ipnet = IPv4Network(network_cidr_str) + return (IPv4Address(range_start_str) in ipnet + and IPv4Address(range_end_str) in ipnet) + except ValueError as ex: + logging.error(network_cidr_str + " is not a valid network address") + raise + + +def filter_ipnet_range_size(network_cidr, range_start, range_end): + """Get the size of an IP range between two IP addresses""" + try: + network_cidr_str = unicode(network_cidr) + range_start_str = unicode(range_start) + range_end_str = unicode(range_end) + except NameError as ex: + network_cidr_str = str(network_cidr) + range_start_str = str(range_start) + range_end_str = str(range_end) + try: + ipnet = IPv4Network(network_cidr_str) + ip1 = IPv4Address(range_start_str) + ip2 = IPv4Address(range_end_str) + + if ip1 in ipnet and ip2 in ipnet: + index1 = list(ipnet.hosts()).index(ip1) + index2 = list(ipnet.hosts()).index(ip2) + ip_range_size = index2 - index1 + 1 + return ip_range_size + else: + raise ValueError + except ValueError as ex: + logging.error(range_start_str + " and " + range_end_str + + " are not valid IP addresses for range inside " + + network_cidr_str) + raise + + +# This filter is too simple and does not take network mask into account. +# TODO Deprecate for filter_ipnet_hostaddr +def filter_ipaddr_index(base_address, index): + """Return IP address in given network at given index""" + try: + base_address_str = unicode(base_address) + except NameError as ex: + base_address_str = str(base_address) + return IPv4Address(base_address_str) + int(index) + + +# TODO deprecate for filter_ipnet_netmask +def filter_netmask(prefix): + """Get netmask from prefix length integer""" + try: + prefix_str = unicode(prefix) + except NameError as ex: + prefix_str = str(prefix) + return IPv4Network("1.0.0.0/"+prefix_str).netmask diff --git a/config/utils/generate_config.py b/config/utils/generate_config.py index b2b52f0b..f1c395a5 100755 --- a/config/utils/generate_config.py +++ b/config/utils/generate_config.py @@ -7,60 +7,24 @@ # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## -"""This module does blah blah.""" +"""Generate configuration from PDF/IDF and jinja2 installer template""" + import argparse -import ipaddress import logging import os +from subprocess import CalledProcessError, check_output +import gen_config_lib import yaml from jinja2 import Environment, FileSystemLoader -from subprocess import CalledProcessError, check_output + PARSER = argparse.ArgumentParser() PARSER.add_argument("--yaml", "-y", type=str, required=True) PARSER.add_argument("--jinja2", "-j", type=str, required=True) ARGS = PARSER.parse_args() -# Processor architecture vs DPKG architecture mapping -DPKG_ARCH_TABLE = { - 'aarch64': 'arm64', - 'x86_64': 'amd64', -} -ARCH_DPKG_TABLE = dict(zip(DPKG_ARCH_TABLE.values(), DPKG_ARCH_TABLE.keys())) - -# Custom filter to allow simple IP address operations returning -# a new address from an upper or lower (negative) index -def ipaddr_index(base_address, index): - """Return IP address in given network at given index""" - try: - base_address_str = unicode(base_address) - #pylint: disable=unused-variable - except NameError as ex: - base_address_str = str(base_address) - return ipaddress.ip_address(base_address_str) + int(index) - -# Custom filter to transform a prefix netmask to IP address format netmask -def netmask(prefix): - """Get netmask from prefix length integer""" - try: - prefix_str = unicode(prefix) - except NameError as ex: - prefix_str = str(prefix) - return ipaddress.IPv4Network("1.0.0.0/"+prefix_str).netmask - -# Custom filter to convert between processor architecture -# (as reported by $(uname -m)) and DPKG-style architecture -def dpkg_arch(arch, to_dpkg=True): - """Return DPKG-compatible from processor arch and vice-versa""" - if to_dpkg: - return DPKG_ARCH_TABLE[arch] - else: - return ARCH_DPKG_TABLE[arch] - ENV = Environment(loader=FileSystemLoader(os.path.dirname(ARGS.jinja2))) -ENV.filters['ipaddr_index'] = ipaddr_index -ENV.filters['netmask'] = netmask -ENV.filters['dpkg_arch'] = dpkg_arch +gen_config_lib.load_custom_filters(ENV) # Run `eyaml decrypt` on the whole file, but only if PDF data is encrypted # Note: eyaml return code is 0 even if keys are not available @@ -91,5 +55,5 @@ if os.path.exists(IDF_PATH): # Render template and print generated conf to console TEMPLATE = ENV.get_template(os.path.basename(ARGS.jinja2)) -#pylint: disable=superfluous-parens +# pylint: disable=superfluous-parens print(TEMPLATE.render(conf=DICT))