summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
b8b2c01)
It also adds this support to its dependencies.
Change-Id: I0534f0c7b0e15a9ee89f522f314cf5200874454c
Signed-off-by: Cédric Ollivier <cedric.ollivier@orange.com>
import os
import re
import sys
import os
import re
import sys
import robot.api
from robot.errors import RobotError
import robot.run
from robot.utils.robottime import timestamp_to_secs
import robot.api
from robot.errors import RobotError
import robot.run
from robot.utils.robottime import timestamp_to_secs
+from six.moves import urllib
from functest.core import testcase
import functest.utils.openstack_utils as op_utils
from functest.core import testcase
import functest.utils.openstack_utils as op_utils
try:
for line in fileinput.input(odl_variables_files,
inplace=True):
try:
for line in fileinput.input(odl_variables_files,
inplace=True):
- print re.sub("AUTH = .*",
+ print(re.sub("AUTH = .*",
("AUTH = [u'" + odlusername + "', u'" +
odlpassword + "']"),
("AUTH = [u'" + odlusername + "', u'" +
odlpassword + "']"),
return True
except Exception as ex: # pylint: disable=broad-except
cls.__logger.error("Cannot set ODL creds: %s", str(ex))
return True
except Exception as ex: # pylint: disable=broad-except
cls.__logger.error("Cannot set ODL creds: %s", str(ex))
odlusername = kwargs['odlusername']
odlpassword = kwargs['odlpassword']
osauthurl = kwargs['osauthurl']
odlusername = kwargs['odlusername']
odlpassword = kwargs['odlpassword']
osauthurl = kwargs['osauthurl']
- keystoneip = urlparse.urlparse(osauthurl).hostname
+ keystoneip = urllib.parse.urlparse(osauthurl).hostname
variables = ['KEYSTONE:' + keystoneip,
'NEUTRON:' + kwargs['neutronip'],
'OS_AUTH_URL:"' + osauthurl + '"',
variables = ['KEYSTONE:' + keystoneip,
'NEUTRON:' + kwargs['neutronip'],
'OS_AUTH_URL:"' + osauthurl + '"',
except KeyError:
pass
neutron_url = op_utils.get_endpoint(service_type='network')
except KeyError:
pass
neutron_url = op_utils.get_endpoint(service_type='network')
- kwargs = {'neutronip': urlparse.urlparse(neutron_url).hostname}
+ kwargs = {'neutronip': urllib.parse.urlparse(neutron_url).hostname}
kwargs['odlip'] = kwargs['neutronip']
kwargs['odlwebport'] = '8080'
kwargs['odlrestconfport'] = '8181'
kwargs['odlip'] = kwargs['neutronip']
kwargs['odlwebport'] = '8080'
kwargs['odlrestconfport'] = '8181'
import errno
import logging
import os
import errno
import logging
import os
import unittest
from keystoneauth1.exceptions import auth_plugins
import unittest
from keystoneauth1.exceptions import auth_plugins
from robot.errors import DataError, RobotError
from robot.result import model
from robot.utils.robottime import timestamp_to_secs
from robot.errors import DataError, RobotError
from robot.result import model
from robot.utils.robottime import timestamp_to_secs
from functest.core import testcase
from functest.opnfv_tests.sdn.odl import odl
from functest.core import testcase
from functest.opnfv_tests.sdn.odl import odl
os.path.join(odl.ODLTests.odl_test_repo,
'csit/variables/Variables.py'), inplace=True)
os.path.join(odl.ODLTests.odl_test_repo,
'csit/variables/Variables.py'), inplace=True)
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def _test_set_vars(self, msg1, msg2, *args):
line = mock.MagicMock()
line.__iter__.return_value = [msg1]
def _test_set_vars(self, msg1, msg2, *args):
line = mock.MagicMock()
line.__iter__.return_value = [msg1]
def test_set_vars_auth1(self):
self._test_set_vars("AUTH1 = []", "AUTH1 = []")
def test_set_vars_auth1(self):
self._test_set_vars("AUTH1 = []", "AUTH1 = []")
- @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stdout', new_callable=six.StringIO)
def test_set_vars_auth_foo(self, *args):
line = mock.MagicMock()
line.__iter__.return_value = ["AUTH = []"]
def test_set_vars_auth_foo(self, *args):
line = mock.MagicMock()
line.__iter__.return_value = ["AUTH = []"]
"--odlip={}".format(self._sdn_controller_ip)]),
self.defaultargs)
"--odlip={}".format(self._sdn_controller_ip)]),
self.defaultargs)
- @mock.patch('sys.stderr', new_callable=StringIO.StringIO)
+ @mock.patch('sys.stderr', new_callable=six.StringIO)
def test_fail(self, mock_method):
self.defaultargs['foo'] = 'bar'
with self.assertRaises(SystemExit):
def test_fail(self, mock_method):
self.defaultargs['foo'] = 'bar'
with self.assertRaises(SystemExit):
import os
import time
import unittest
import os
import time
import unittest
from git.exc import NoSuchPathError
import mock
import requests
from git.exc import NoSuchPathError
import mock
import requests
+from six.moves import urllib
from functest.tests.unit import test_utils
from functest.utils import functest_utils
from functest.tests.unit import test_utils
from functest.utils import functest_utils
self.file_yaml = {'general': {'openstack': {'image_name':
'test_image_name'}}}
self.file_yaml = {'general': {'openstack': {'image_name':
'test_image_name'}}}
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_check_internet_connectivity_failed(self, mock_method):
self.assertFalse(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
def test_check_internet_connectivity_failed(self, mock_method):
self.assertFalse(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_default(self, mock_method):
self.assertTrue(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
def test_check_internet_connectivity_default(self, mock_method):
self.assertTrue(functest_utils.check_internet_connectivity())
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_check_internet_connectivity_debian(self, mock_method):
self.url = "https://www.debian.org/"
self.assertTrue(functest_utils.check_internet_connectivity(self.url))
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
def test_check_internet_connectivity_debian(self, mock_method):
self.url = "https://www.debian.org/"
self.assertTrue(functest_utils.check_internet_connectivity(self.url))
mock_method.assert_called_once_with(self.url, timeout=self.timeout)
- @mock.patch('urllib2.urlopen',
- side_effect=urllib2.URLError('no host given'))
+ @mock.patch('six.moves.urllib.request.urlopen',
+ side_effect=urllib.error.URLError('no host given'))
def test_download_url_failed(self, mock_url):
self.assertFalse(functest_utils.download_url(self.url, self.dest_path))
def test_download_url_failed(self, mock_url):
self.assertFalse(functest_utils.download_url(self.url, self.dest_path))
- @mock.patch('urllib2.urlopen')
+ @mock.patch('six.moves.urllib.request.urlopen')
def test_download_url_default(self, mock_url):
def test_download_url_default(self, mock_url):
- with mock.patch("__builtin__.open", mock.mock_open()) as m, \
+ with mock.patch("six.moves.builtins.open", mock.mock_open()) as m, \
mock.patch('functest.utils.functest_utils.shutil.copyfileobj')\
as mock_sh:
name = self.url.rsplit('/')[-1]
mock.patch('functest.utils.functest_utils.shutil.copyfileobj')\
as mock_sh:
name = self.url.rsplit('/')[-1]
attrs = {'readline.side_effect': self.readline_side}
m.configure_mock(**attrs)
attrs = {'readline.side_effect': self.readline_side}
m.configure_mock(**attrs)
- with mock.patch("__builtin__.open") as mo:
+ with mock.patch("six.moves.builtins.open") as mo:
mo.return_value = m
self.assertEqual(functest_utils.get_resolvconf_ns(),
self.test_ip[1:])
mo.return_value = m
self.assertEqual(functest_utils.get_resolvconf_ns(),
self.test_ip[1:])
mock_logger_error):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
mock_logger_error):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
FunctestUtilsTesting.readline = 0
):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
):
with mock.patch('functest.utils.functest_utils.subprocess.Popen') \
as mock_subproc_open, \
- mock.patch('__builtin__.open', mock.mock_open()) as mopen:
+ mock.patch('six.moves.builtins.open',
+ mock.mock_open()) as mopen:
FunctestUtilsTesting.readline = 0
FunctestUtilsTesting.readline = 0
@mock.patch('functest.utils.functest_utils.logger.error')
def test_get_dict_by_test(self, mock_logger_error):
@mock.patch('functest.utils.functest_utils.logger.error')
def test_get_dict_by_test(self, mock_logger_error):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
mock.patch('functest.utils.functest_utils.get_testcases_'
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
mock.patch('functest.utils.functest_utils.get_testcases_'
def test_get_parameter_from_yaml_failed(self):
self.file_yaml['general'] = None
def test_get_parameter_from_yaml_failed(self):
self.file_yaml['general'] = None
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
self.assertRaises(ValueError) as excep:
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml, \
self.assertRaises(ValueError) as excep:
self.parameter) in excep.exception)
def test_get_parameter_from_yaml_default(self):
self.parameter) in excep.exception)
def test_get_parameter_from_yaml_default(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
"functest/ci/testcases.yaml")
def test_get_functest_yaml(self):
"functest/ci/testcases.yaml")
def test_get_functest_yaml(self):
- with mock.patch('__builtin__.open', mock.mock_open()), \
+ with mock.patch('six.moves.builtins.open', mock.mock_open()), \
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
mock.patch('functest.utils.functest_utils.yaml.safe_load') \
as mock_yaml:
mock_yaml.return_value = self.file_yaml
import errno
import functools
import os
import errno
import functools
import os
import mock
import requests.sessions
import mock
import requests.sessions
+from six.moves import urllib
def can_dump_request_to_file(method):
def dump_preparedrequest(request, **kwargs):
# pylint: disable=unused-argument
def can_dump_request_to_file(method):
def dump_preparedrequest(request, **kwargs):
# pylint: disable=unused-argument
- parseresult = urlparse.urlparse(request.url)
+ parseresult = urllib.parse.urlparse(request.url)
if parseresult.scheme == "file":
try:
dirname = os.path.dirname(parseresult.path)
if parseresult.scheme == "file":
try:
dirname = os.path.dirname(parseresult.path)
def patch_request(method, url, **kwargs):
with requests.sessions.Session() as session:
def patch_request(method, url, **kwargs):
with requests.sessions.Session() as session:
- parseresult = urlparse.urlparse(url)
+ parseresult = urllib.parse.urlparse(url)
if parseresult.scheme == "file":
with mock.patch.object(session, 'send',
side_effect=dump_preparedrequest):
if parseresult.scheme == "file":
with mock.patch.object(session, 'send',
side_effect=dump_preparedrequest):
import subprocess
import sys
import time
import subprocess
import sys
import time
from datetime import datetime as dt
import dns.resolver
import requests
from datetime import datetime as dt
import dns.resolver
import requests
+from six.moves import urllib
import yaml
from git import Repo
import yaml
from git import Repo
Check if there is access to the internet
"""
try:
Check if there is access to the internet
"""
try:
- urllib2.urlopen(url, timeout=5)
+ urllib.request.urlopen(url, timeout=5)
- except urllib2.URLError:
+ except urllib.error.URLError:
name = url.rsplit('/')[-1]
dest = dest_path + "/" + name
try:
name = url.rsplit('/')[-1]
dest = dest_path + "/" + name
try:
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
+ response = urllib.request.urlopen(url)
+ except (urllib.error.HTTPError, urllib.error.URLError):
return False
with open(dest, 'wb') as f:
return False
with open(dest, 'wb') as f:
f.write(line)
else:
line = line.replace('\n', '')
f.write(line)
else:
line = line.replace('\n', '')
sys.stdout.flush()
if output_file:
f.close()
sys.stdout.flush()
if output_file:
f.close()
try:
instances = nova_client.servers.list(search_opts={'all_tenants': 1})
return instances
try:
instances = nova_client.servers.list(search_opts={'all_tenants': 1})
return instances
logger.error("Error [get_instances(nova_client)]: %s" % e)
return None
logger.error("Error [get_instances(nova_client)]: %s" % e)
return None
try:
instance = nova_client.servers.get(instance.id)
return instance.status
try:
instance = nova_client.servers.get(instance.id)
return instance.status
logger.error("Error [get_instance_status(nova_client)]: %s" % e)
return None
logger.error("Error [get_instance_status(nova_client)]: %s" % e)
return None
try:
instance = nova_client.servers.find(name=instance_name)
return instance
try:
instance = nova_client.servers.find(name=instance_name)
return instance
logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s"
% (instance_name, e))
return None
logger.error("Error [get_instance_by_name(nova_client, '%s')]: %s"
% (instance_name, e))
return None
try:
aggregates = nova_client.aggregates.list()
return aggregates
try:
aggregates = nova_client.aggregates.list()
return aggregates
logger.error("Error [get_aggregates(nova_client)]: %s" % e)
return None
logger.error("Error [get_aggregates(nova_client)]: %s" % e)
return None
aggregates = get_aggregates(nova_client)
_id = [ag.id for ag in aggregates if ag.name == aggregate_name][0]
return _id
aggregates = get_aggregates(nova_client)
_id = [ag.id for ag in aggregates if ag.name == aggregate_name][0]
return _id
logger.error("Error [get_aggregate_id(nova_client, %s)]:"
" %s" % (aggregate_name, e))
return None
logger.error("Error [get_aggregate_id(nova_client, %s)]:"
" %s" % (aggregate_name, e))
return None
try:
availability_zones = nova_client.availability_zones.list()
return availability_zones
try:
availability_zones = nova_client.availability_zones.list()
return availability_zones
logger.error("Error [get_availability_zones(nova_client)]: %s" % e)
return None
logger.error("Error [get_availability_zones(nova_client)]: %s" % e)
return None
try:
az_names = [az.zoneName for az in get_availability_zones(nova_client)]
return az_names
try:
az_names = [az.zoneName for az in get_availability_zones(nova_client)]
return az_names
logger.error("Error [get_availability_zone_names(nova_client)]:"
" %s" % e)
return None
logger.error("Error [get_availability_zone_names(nova_client)]:"
" %s" % e)
return None
# flavor extra specs are not configured, therefore skip the update
pass
# flavor extra specs are not configured, therefore skip the update
pass
logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
"'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
return None
logger.error("Error [create_flavor(nova_client, '%s', '%s', '%s', "
"'%s')]: %s" % (flavor_name, ram, disk, vcpus, e))
return None
try:
floating_ips = nova_client.floating_ips.list()
return floating_ips
try:
floating_ips = nova_client.floating_ips.list()
return floating_ips
logger.error("Error [get_floating_ips(nova_client)]: %s" % e)
return None
logger.error("Error [get_floating_ips(nova_client)]: %s" % e)
return None
if hypervisor.state == "up":
nodes.append(hypervisor.hypervisor_hostname)
return nodes
if hypervisor.state == "up":
nodes.append(hypervisor.hypervisor_hostname)
return nodes
logger.error("Error [get_hypervisors(nova_client)]: %s" % e)
return None
logger.error("Error [get_hypervisors(nova_client)]: %s" % e)
return None
try:
nova_client.aggregates.create(aggregate_name, av_zone)
return True
try:
nova_client.aggregates.create(aggregate_name, av_zone)
return True
logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, av_zone, e))
return None
logger.error("Error [create_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, av_zone, e))
return None
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.add_host(aggregate_id, compute_host)
return True
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.add_host(aggregate_id, compute_host)
return True
logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, compute_host, e))
return None
logger.error("Error [add_host_to_aggregate(nova_client, %s, %s)]: %s"
% (aggregate_name, compute_host, e))
return None
create_aggregate(nova_client, aggregate_name, av_zone)
add_host_to_aggregate(nova_client, aggregate_name, compute_host)
return True
create_aggregate(nova_client, aggregate_name, av_zone)
add_host_to_aggregate(nova_client, aggregate_name, compute_host)
return True
logger.error("Error [create_aggregate_with_host("
"nova_client, %s, %s, %s)]: %s"
% (aggregate_name, av_zone, compute_host, e))
logger.error("Error [create_aggregate_with_host("
"nova_client, %s, %s, %s)]: %s"
% (aggregate_name, av_zone, compute_host, e))
ip_json = neutron_client.create_floatingip({'floatingip': props})
fip_addr = ip_json['floatingip']['floating_ip_address']
fip_id = ip_json['floatingip']['id']
ip_json = neutron_client.create_floatingip({'floatingip': props})
fip_addr = ip_json['floatingip']['floating_ip_address']
fip_id = ip_json['floatingip']['id']
logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
try:
nova_client.servers.add_floating_ip(server_id, floatingip_addr)
return True
try:
nova_client.servers.add_floating_ip(server_id, floatingip_addr)
return True
logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s"
% (server_id, floatingip_addr, e))
return False
logger.error("Error [add_floating_ip(nova_client, '%s', '%s')]: %s"
% (server_id, floatingip_addr, e))
return False
try:
nova_client.servers.force_delete(instance_id)
return True
try:
nova_client.servers.force_delete(instance_id)
return True
logger.error("Error [delete_instance(nova_client, '%s')]: %s"
% (instance_id, e))
return False
logger.error("Error [delete_instance(nova_client, '%s')]: %s"
% (instance_id, e))
return False
try:
nova_client.floating_ips.delete(floatingip_id)
return True
try:
nova_client.floating_ips.delete(floatingip_id)
return True
logger.error("Error [delete_floating_ip(nova_client, '%s')]: %s"
% (floatingip_id, e))
return False
logger.error("Error [delete_floating_ip(nova_client, '%s')]: %s"
% (floatingip_id, e))
return False
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.remove_host(aggregate_id, compute_host)
return True
aggregate_id = get_aggregate_id(nova_client, aggregate_name)
nova_client.aggregates.remove_host(aggregate_id, compute_host)
return True
logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:"
" %s" % (aggregate_name, compute_host, e))
return False
logger.error("Error [remove_host_from_aggregate(nova_client, %s, %s)]:"
" %s" % (aggregate_name, compute_host, e))
return False
remove_hosts_from_aggregate(nova_client, aggregate_name)
nova_client.aggregates.delete(aggregate_name)
return True
remove_hosts_from_aggregate(nova_client, aggregate_name)
nova_client.aggregates.delete(aggregate_name)
return True
logger.error("Error [delete_aggregate(nova_client, %s)]: %s"
% (aggregate_name, e))
return False
logger.error("Error [delete_aggregate(nova_client, %s)]: %s"
% (aggregate_name, e))
return False
network = neutron_client.create_network(body=json_body)
network_dict = network['network']
return network_dict['id']
network = neutron_client.create_network(body=json_body)
network_dict = network['network']
return network_dict['id']
logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
% (name, e))
return None
logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
% (name, e))
return None
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
try:
subnet = neutron_client.create_subnet(body=json_body)
return subnet['subnets'][0]['id']
logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
try:
router = neutron_client.create_router(json_body)
return router['router']['id']
try:
router = neutron_client.create_router(json_body)
return router['router']['id']
logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
% (name, e))
return None
logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
% (name, e))
return None
try:
port = neutron_client.create_port(body=json_body)
return port['port']['id']
try:
port = neutron_client.create_port(body=json_body)
return port['port']['id']
logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
try:
neutron_client.update_network(network_id, body=json_body)
return True
try:
neutron_client.update_network(network_id, body=json_body)
return True
logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
port = neutron_client.update_port(port=port_id,
body=json_body)
return port['port']['id']
port = neutron_client.update_port(port=port_id,
body=json_body)
return port['port']['id']
logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
try:
neutron_client.add_interface_router(router=router_id, body=json_body)
return True
try:
neutron_client.add_interface_router(router=router_id, body=json_body)
return True
logger.error("Error [add_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
logger.error("Error [add_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
try:
neutron_client.add_gateway_router(router_id, router_dict)
return True
try:
neutron_client.add_gateway_router(router_id, router_dict)
return True
logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
try:
neutron_client.delete_network(network_id)
return True
try:
neutron_client.delete_network(network_id)
return True
logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
% (network_id, e))
return False
logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
% (network_id, e))
return False
try:
neutron_client.delete_subnet(subnet_id)
return True
try:
neutron_client.delete_subnet(subnet_id)
return True
logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
% (subnet_id, e))
return False
logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
% (subnet_id, e))
return False
try:
neutron_client.delete_router(router=router_id)
return True
try:
neutron_client.delete_router(router=router_id)
return True
logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
try:
neutron_client.delete_port(port_id)
return True
try:
neutron_client.delete_port(port_id)
return True
logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
% (port_id, e))
return False
logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
% (port_id, e))
return False
neutron_client.remove_interface_router(router=router_id,
body=json_body)
return True
neutron_client.remove_interface_router(router=router_id,
body=json_body)
return True
logger.error("Error [remove_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
logger.error("Error [remove_interface_router(neutron_client, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
try:
neutron_client.remove_gateway_router(router_id)
return True
try:
neutron_client.remove_gateway_router(router_id)
return True
logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
% (router_id, e))
return False
security_groups = neutron_client.list_security_groups()[
'security_groups']
return security_groups
security_groups = neutron_client.list_security_groups()[
'security_groups']
return security_groups
logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
return None
logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
return None
try:
secgroup = neutron_client.create_security_group(json_body)
return secgroup['security_group']
try:
secgroup = neutron_client.create_security_group(json_body)
return secgroup['security_group']
logger.error("Error [create_security_group(neutron_client, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
logger.error("Error [create_security_group(neutron_client, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
security_rules = [rule for rule in security_rules
if rule["security_group_id"] == sg_id]
return security_rules
security_rules = [rule for rule in security_rules
if rule["security_group_id"] == sg_id]
return security_rules
logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
" %s" % e)
return None
logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
" %s" % e)
return None
return True
else:
return False
return True
else:
return False
logger.error("Error [check_security_group_rules("
" neutron_client, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
logger.error("Error [check_security_group_rules("
" neutron_client, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
try:
nova_client.servers.add_security_group(instance_id, secgroup_id)
return True
try:
nova_client.servers.add_security_group(instance_id, secgroup_id)
return True
logger.error("Error [add_secgroup_to_instance(nova_client, '%s', "
"'%s')]: %s" % (instance_id, secgroup_id, e))
return False
logger.error("Error [add_secgroup_to_instance(nova_client, '%s', "
"'%s')]: %s" % (instance_id, secgroup_id, e))
return False
neutron_client.update_quota(tenant_id=tenant_id,
body=json_body)
return True
neutron_client.update_quota(tenant_id=tenant_id,
body=json_body)
return True
logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
try:
neutron_client.delete_security_group(secgroup_id)
return True
try:
neutron_client.delete_security_group(secgroup_id)
return True
logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
% (secgroup_id, e))
return False
logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
% (secgroup_id, e))
return False
try:
images = nova_client.images.list()
return images
try:
images = nova_client.images.list()
return images
logger.error("Error [get_images]: %s" % e)
return None
logger.error("Error [get_images]: %s" % e)
return None
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return image_id
with open(file_path) as image_data:
glance_client.images.upload(image_id, image_data)
return image_id
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
"'%s')]: %s" % (image_name, file_path, public, e))
return None
logger.error("Error [create_glance_image(glance_client, '%s', '%s', "
"'%s')]: %s" % (image_name, file_path, public, e))
return None
try:
nova_client.images.delete(image_id)
return True
try:
nova_client.images.delete(image_id)
return True
logger.error("Error [delete_glance_image(nova_client, '%s')]: %s"
% (image_id, e))
return False
logger.error("Error [delete_glance_image(nova_client, '%s')]: %s"
% (image_id, e))
return False
try:
volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
return volumes
try:
volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
return volumes
logger.error("Error [get_volumes(cinder_client)]: %s" % e)
return None
logger.error("Error [get_volumes(cinder_client)]: %s" % e)
return None
if not private:
volume_types = [vt for vt in volume_types if vt.is_public]
return volume_types
if not private:
volume_types = [vt for vt in volume_types if vt.is_public]
return volume_types
logger.error("Error [list_volume_types(cinder_client)]: %s" % e)
return None
logger.error("Error [list_volume_types(cinder_client)]: %s" % e)
return None
try:
volume_type = cinder_client.volume_types.create(name)
return volume_type
try:
volume_type = cinder_client.volume_types.create(name)
return volume_type
logger.error("Error [create_volume_type(cinder_client, '%s')]: %s"
% (name, e))
return None
logger.error("Error [create_volume_type(cinder_client, '%s')]: %s"
% (name, e))
return None
try:
cinder_client.quotas.update(tenant_id, **quotas_values)
return True
try:
cinder_client.quotas.update(tenant_id, **quotas_values)
return True
logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
"'%s' '%s')]: %s" % (tenant_id, vols_quota,
snapshots_quota, gigabytes_quota, e))
logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
"'%s' '%s')]: %s" % (tenant_id, vols_quota,
snapshots_quota, gigabytes_quota, e))
else:
cinder_client.volumes.delete(volume_id)
return True
else:
cinder_client.volumes.delete(volume_id)
return True
logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
% (volume_id, str(forced), e))
return False
logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
% (volume_id, str(forced), e))
return False
try:
cinder_client.volume_types.delete(volume_type)
return True
try:
cinder_client.volume_types.delete(volume_type)
return True
logger.error("Error [delete_volume_type(cinder_client, '%s')]: %s"
% (volume_type, e))
return False
logger.error("Error [delete_volume_type(cinder_client, '%s')]: %s"
% (volume_type, e))
return False
else:
tenants = keystone_client.tenants.list()
return tenants
else:
tenants = keystone_client.tenants.list()
return tenants
logger.error("Error [get_tenants(keystone_client)]: %s" % e)
return None
logger.error("Error [get_tenants(keystone_client)]: %s" % e)
return None
try:
users = keystone_client.users.list()
return users
try:
users = keystone_client.users.list()
return users
logger.error("Error [get_users(keystone_client)]: %s" % e)
return None
logger.error("Error [get_users(keystone_client)]: %s" % e)
return None
tenant_description,
enabled=True)
return tenant.id
tenant_description,
enabled=True)
return tenant.id
logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
% (tenant_name, tenant_description, e))
return None
logger.error("Error [create_tenant(keystone_client, '%s', '%s')]: %s"
% (tenant_name, tenant_description, e))
return None
tenant_id,
enabled=True)
return user.id
tenant_id,
enabled=True)
return user.id
logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
"'%s')]: %s" % (user_name, user_password,
user_email, tenant_id, e))
logger.error("Error [create_user(keystone_client, '%s', '%s', '%s'"
"'%s')]: %s" % (user_name, user_password,
user_email, tenant_id, e))
else:
keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
return True
else:
keystone_client.roles.add_user_role(user_id, role_id, tenant_id)
return True
logger.error("Error [add_role_user(keystone_client, '%s', '%s'"
"'%s')]: %s " % (user_id, role_id, tenant_id, e))
return False
logger.error("Error [add_role_user(keystone_client, '%s', '%s'"
"'%s')]: %s " % (user_id, role_id, tenant_id, e))
return False
else:
keystone_client.tenants.delete(tenant_id)
return True
else:
keystone_client.tenants.delete(tenant_id)
return True
logger.error("Error [delete_tenant(keystone_client, '%s')]: %s"
% (tenant_id, e))
return False
logger.error("Error [delete_tenant(keystone_client, '%s')]: %s"
% (tenant_id, e))
return False
try:
keystone_client.users.delete(user_id)
return True
try:
keystone_client.users.delete(user_id)
return True
logger.error("Error [delete_user(keystone_client, '%s')]: %s"
% (user_id, e))
return False
logger.error("Error [delete_user(keystone_client, '%s')]: %s"
% (user_id, e))
return False
try:
resources = heat_client.resources.get(stack_id, resource)
return resources
try:
resources = heat_client.resources.get(stack_id, resource)
return resources
logger.error("Error [get_resource]: %s" % e)
return None
logger.error("Error [get_resource]: %s" % e)
return None
mock==1.3.0
iniparse==0.4
PrettyTable>=0.7.1,<0.8 # BSD
mock==1.3.0
iniparse==0.4
PrettyTable>=0.7.1,<0.8 # BSD
subprocess32==3.2.7
virtualenv==15.1.0
PrettyTable>=0.7.1,<0.8 # BSD
subprocess32==3.2.7
virtualenv==15.1.0
PrettyTable>=0.7.1,<0.8 # BSD