return flavor_exists, flavor_id
-def get_floating_ips(neutron_client):
+def get_floating_ips(conn):
try:
- floating_ips = neutron_client.list_floatingips()
- return floating_ips['floatingips']
+ floating_ips = conn.network.ips()
+ return floating_ips
except Exception as e:
- logger.error("Error [get_floating_ips(neutron_client)]: %s" % e)
+ logger.error("Error [get_floating_ips(network)]: %s" % e)
return None
return None
-def create_floating_ip(neutron_client):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id}
+def create_floating_ip(conn):
+ extnet_id = get_external_net_id(conn)
try:
- ip_json = neutron_client.create_floatingip({'floatingip': props})
- fip_addr = ip_json['floatingip']['floating_ip_address']
- fip_id = ip_json['floatingip']['id']
+ fip = conn.network.create_ip(floating_network_id=extnet_id)
+ fip_addr = fip.floating_ip_address
+ fip_id = fip.id
except Exception as e:
- logger.error("Error [create_floating_ip(neutron_client)]: %s" % e)
+ logger.error("Error [create_floating_ip(network)]: %s" % e)
return None
return {'fip_addr': fip_addr, 'fip_id': fip_id}
-def attach_floating_ip(neutron_client, port_id):
- extnet_id = get_external_net_id(neutron_client)
- props = {'floating_network_id': extnet_id,
- 'port_id': port_id}
+def attach_floating_ip(conn, port_id):
+ extnet_id = get_external_net_id(conn)
try:
- return neutron_client.create_floatingip({'floatingip': props})
+ return conn.network.create_ip(floating_network_id=extnet_id,
+ port_id=port_id)
except Exception as e:
- logger.error("Error [Attach_floating_ip(neutron_client), %s]: %s"
+ logger.error("Error [Attach_floating_ip(network), %s]: %s"
% (port_id, e))
return None
return False
-def delete_floating_ip(neutron_client, floatingip_id):
+def delete_floating_ip(conn, floatingip_id):
try:
- neutron_client.delete_floatingip(floatingip_id)
+ conn.network.delete_ip(floatingip_id)
return True
except Exception as e:
- logger.error("Error [delete_floating_ip(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_floating_ip(network, '%s')]: %s"
% (floatingip_id, e))
return False
# *********************************************
# NEUTRON
# *********************************************
-def get_network_list(neutron_client):
- network_list = neutron_client.list_networks()['networks']
- if len(network_list) == 0:
- return None
- else:
- return network_list
+def get_network_list(conn):
+ return conn.network.networks()
-def get_router_list(neutron_client):
- router_list = neutron_client.list_routers()['routers']
- if len(router_list) == 0:
- return None
- else:
- return router_list
+def get_router_list(conn):
+ return conn.network.routers()
-def get_port_list(neutron_client):
- port_list = neutron_client.list_ports()['ports']
- if len(port_list) == 0:
- return None
- else:
- return port_list
+def get_port_list(conn):
+ return conn.network.ports()
-def get_network_id(neutron_client, network_name):
- networks = neutron_client.list_networks()['networks']
+def get_network_id(conn, network_name):
+ networks = conn.network.networks()
id = ''
for n in networks:
- if n['name'] == network_name:
- id = n['id']
+ if n.name == network_name:
+ id = n.id
break
return id
-def get_subnet_id(neutron_client, subnet_name):
- subnets = neutron_client.list_subnets()['subnets']
+def get_subnet_id(conn, subnet_name):
+ subnets = conn.network.subnets()
id = ''
for s in subnets:
- if s['name'] == subnet_name:
- id = s['id']
+ if s.name == subnet_name:
+ id = s.id
break
return id
-def get_router_id(neutron_client, router_name):
- routers = neutron_client.list_routers()['routers']
+def get_router_id(conn, router_name):
+ routers = conn.network.routers()
id = ''
for r in routers:
- if r['name'] == router_name:
- id = r['id']
+ if r.name == router_name:
+ id = r.id
break
return id
-def get_private_net(neutron_client):
+def get_private_net(conn):
# Checks if there is an existing shared private network
- networks = neutron_client.list_networks()['networks']
- if len(networks) == 0:
- return None
+ networks = conn.network.networks()
for net in networks:
- if (net['router:external'] is False) and (net['shared'] is True):
+ if (net.is_router_external is False) and (net.is_shared is True):
return net
return None
-def get_external_net(neutron_client):
+def get_external_net(conn):
if (env.get('EXTERNAL_NETWORK')):
return env.get('EXTERNAL_NETWORK')
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['name']
+ for network in conn.network.networks():
+ if network.is_router_external:
+ return network.name
return None
-def get_external_net_id(neutron_client):
+def get_external_net_id(conn):
if (env.get('EXTERNAL_NETWORK')):
- networks = neutron_client.list_networks(
- name=env.get('EXTERNAL_NETWORK'))
- net_id = networks['networks'][0]['id']
+ networks = conn.network.networks(name=env.get('EXTERNAL_NETWORK'))
+ net_id = networks.next().id
return net_id
- for network in neutron_client.list_networks()['networks']:
- if network['router:external']:
- return network['id']
+ for network in conn.network.networks():
+ if network.is_router_external:
+ return network.id
return None
-def check_neutron_net(neutron_client, net_name):
- for network in neutron_client.list_networks()['networks']:
- if network['name'] == net_name:
- for subnet in network['subnets']:
+def check_neutron_net(conn, net_name):
+ for network in conn.network.networks():
+ if network.name == net_name:
+ for subnet in network.subnet_ids:
return True
return False
-def create_neutron_net(neutron_client, name):
- json_body = {'network': {'name': name,
- 'admin_state_up': True}}
+def create_neutron_net(conn, name):
try:
- network = neutron_client.create_network(body=json_body)
- network_dict = network['network']
- return network_dict['id']
+ network = conn.network.create_network(name=name)
+ return network.id
except Exception as e:
- logger.error("Error [create_neutron_net(neutron_client, '%s')]: %s"
+ logger.error("Error [create_neutron_net(network, '%s')]: %s"
% (name, e))
return None
-def create_neutron_subnet(neutron_client, name, cidr, net_id,
+def create_neutron_subnet(conn, name, cidr, net_id,
dns=['8.8.8.8', '8.8.4.4']):
- json_body = {'subnets': [{'name': name, 'cidr': cidr,
- 'ip_version': 4, 'network_id': net_id,
- 'dns_nameservers': dns}]}
-
try:
- subnet = neutron_client.create_subnet(body=json_body)
- return subnet['subnets'][0]['id']
+ subnet = conn.network.create_subnet(name=name,
+ cidr=cidr,
+ ip_version='4',
+ network_id=net_id,
+ dns_nameservers=dns)
+ return subnet.id
except Exception as e:
- logger.error("Error [create_neutron_subnet(neutron_client, '%s', "
+ logger.error("Error [create_neutron_subnet(network, '%s', "
"'%s', '%s')]: %s" % (name, cidr, net_id, e))
return None
-def create_neutron_router(neutron_client, name):
- json_body = {'router': {'name': name, 'admin_state_up': True}}
+def create_neutron_router(conn, name):
try:
- router = neutron_client.create_router(json_body)
- return router['router']['id']
+ router = conn.network.create_router(name=name)
+ return router.id
except Exception as e:
- logger.error("Error [create_neutron_router(neutron_client, '%s')]: %s"
+ logger.error("Error [create_neutron_router(network, '%s')]: %s"
% (name, e))
return None
-def create_neutron_port(neutron_client, name, network_id, ip):
- json_body = {'port': {
- 'admin_state_up': True,
- 'name': name,
- 'network_id': network_id,
- 'fixed_ips': [{"ip_address": ip}]
- }}
+def create_neutron_port(conn, name, network_id, ip):
try:
- port = neutron_client.create_port(body=json_body)
- return port['port']['id']
+ port = conn.network.create_port(name=name,
+ network_id=network_id,
+ fixed_ips=[{'ip_address': ip}])
+ return port.id
except Exception as e:
- logger.error("Error [create_neutron_port(neutron_client, '%s', '%s', "
+ logger.error("Error [create_neutron_port(network, '%s', '%s', "
"'%s')]: %s" % (name, network_id, ip, e))
return None
-def update_neutron_net(neutron_client, network_id, shared=False):
- json_body = {'network': {'shared': shared}}
+def update_neutron_net(conn, network_id, shared=False):
try:
- neutron_client.update_network(network_id, body=json_body)
+ conn.network.update_network(network_id, is_shared=shared)
return True
except Exception as e:
- logger.error("Error [update_neutron_net(neutron_client, '%s', '%s')]: "
+ logger.error("Error [update_neutron_net(network, '%s', '%s')]: "
"%s" % (network_id, str(shared), e))
return False
-def update_neutron_port(neutron_client, port_id, device_owner):
- json_body = {'port': {
- 'device_owner': device_owner,
- }}
+def update_neutron_port(conn, port_id, device_owner):
try:
- port = neutron_client.update_port(port=port_id,
- body=json_body)
- return port['port']['id']
+ port = conn.network.update_port(port_id, device_owner=device_owner)
+ return port.id
except Exception as e:
- logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
+ logger.error("Error [update_neutron_port(network, '%s', '%s')]:"
" %s" % (port_id, device_owner, e))
return None
-def add_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
+def add_interface_router(conn, router_id, subnet_id):
try:
- neutron_client.add_interface_router(router=router_id, body=json_body)
+ conn.network.add_interface_to_router(router_id, subnet_id=subnet_id)
return True
except Exception as e:
- logger.error("Error [add_interface_router(neutron_client, '%s', "
+ logger.error("Error [add_interface_router(network, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
-def add_gateway_router(neutron_client, router_id):
- ext_net_id = get_external_net_id(neutron_client)
+def add_gateway_router(conn, router_id):
+ ext_net_id = get_external_net_id(conn)
router_dict = {'network_id': ext_net_id}
try:
- neutron_client.add_gateway_router(router_id, router_dict)
+ conn.network.update_router(router_id,
+ external_gateway_info=router_dict)
return True
except Exception as e:
- logger.error("Error [add_gateway_router(neutron_client, '%s')]: %s"
+ logger.error("Error [add_gateway_router(network, '%s')]: %s"
% (router_id, e))
return False
-def delete_neutron_net(neutron_client, network_id):
+def delete_neutron_net(conn, network_id):
try:
- neutron_client.delete_network(network_id)
+ conn.network.delete_network(network_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_net(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_net(network, '%s')]: %s"
% (network_id, e))
return False
-def delete_neutron_subnet(neutron_client, subnet_id):
+def delete_neutron_subnet(conn, subnet_id):
try:
- neutron_client.delete_subnet(subnet_id)
+ conn.network.delete_subnet(subnet_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_subnet(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_subnet(network, '%s')]: %s"
% (subnet_id, e))
return False
-def delete_neutron_router(neutron_client, router_id):
+def delete_neutron_router(conn, router_id):
try:
- neutron_client.delete_router(router=router_id)
+ conn.network.delete_router(router_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_router(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_router(network, '%s')]: %s"
% (router_id, e))
return False
-def delete_neutron_port(neutron_client, port_id):
+def delete_neutron_port(conn, port_id):
try:
- neutron_client.delete_port(port_id)
+ conn.network.delete_port(port_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_neutron_port(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_neutron_port(network, '%s')]: %s"
% (port_id, e))
return False
-def remove_interface_router(neutron_client, router_id, subnet_id):
- json_body = {"subnet_id": subnet_id}
+def remove_interface_router(conn, router_id, subnet_id):
try:
- neutron_client.remove_interface_router(router=router_id,
- body=json_body)
+ conn.network.remove_interface_from_router(router_id,
+ subnet_id=subnet_id)
return True
except Exception as e:
- logger.error("Error [remove_interface_router(neutron_client, '%s', "
+ logger.error("Error [remove_interface_router(network, '%s', "
"'%s')]: %s" % (router_id, subnet_id, e))
return False
-def remove_gateway_router(neutron_client, router_id):
+def remove_gateway_router(conn, router_id):
try:
- neutron_client.remove_gateway_router(router_id)
+ conn.network.update_router(router_id, external_gateway_info=None)
return True
except Exception as e:
- logger.error("Error [remove_gateway_router(neutron_client, '%s')]: %s"
+ logger.error("Error [remove_gateway_router(network, '%s')]: %s"
% (router_id, e))
return False
-def create_network_full(neutron_client,
+def create_network_full(conn,
net_name,
subnet_name,
router_name,
dns=['8.8.8.8', '8.8.4.4']):
# Check if the network already exists
- network_id = get_network_id(neutron_client, net_name)
- subnet_id = get_subnet_id(neutron_client, subnet_name)
- router_id = get_router_id(neutron_client, router_name)
+ network_id = get_network_id(conn, net_name)
+ subnet_id = get_subnet_id(conn, subnet_name)
+ router_id = get_router_id(conn, router_name)
if network_id != '' and subnet_id != '' and router_id != '':
logger.info("A network with name '%s' already exists..." % net_name)
else:
- neutron_client.format = 'json'
-
logger.info('Creating neutron network %s...' % net_name)
if network_id == '':
- network_id = create_neutron_net(neutron_client, net_name)
+ network_id = create_neutron_net(conn, net_name)
if not network_id:
return False
logger.debug("Network '%s' created successfully" % network_id)
logger.debug('Creating Subnet....')
if subnet_id == '':
- subnet_id = create_neutron_subnet(neutron_client, subnet_name,
- cidr, network_id, dns)
+ subnet_id = create_neutron_subnet(conn, subnet_name, cidr,
+ network_id, dns)
if not subnet_id:
return None
logger.debug("Subnet '%s' created successfully" % subnet_id)
logger.debug('Creating Router...')
if router_id == '':
- router_id = create_neutron_router(neutron_client, router_name)
+ router_id = create_neutron_router(conn, router_name)
if not router_id:
return None
logger.debug("Router '%s' created successfully" % router_id)
logger.debug('Adding router to subnet...')
- if not add_interface_router(neutron_client, router_id, subnet_id):
+ if not add_interface_router(conn, router_id, subnet_id):
return None
logger.debug("Interface added successfully.")
logger.debug('Adding gateway to router...')
- if not add_gateway_router(neutron_client, router_id):
+ if not add_gateway_router(conn, router_id):
return None
logger.debug("Gateway added successfully.")
def create_shared_network_full(net_name, subnt_name, router_name, subnet_cidr):
- neutron_client = get_neutron_client()
+ conn = get_os_connection()
- network_dic = create_network_full(neutron_client,
+ network_dic = create_network_full(conn,
net_name,
subnt_name,
router_name,
subnet_cidr)
if network_dic:
- if not update_neutron_net(neutron_client,
+ if not update_neutron_net(conn,
network_dic['net_id'],
shared=True):
logger.error("Failed to update network %s..." % net_name)
# *********************************************
-def get_security_groups(neutron_client):
- try:
- security_groups = neutron_client.list_security_groups()[
- 'security_groups']
- return security_groups
- except Exception as e:
- logger.error("Error [get_security_groups(neutron_client)]: %s" % e)
- return None
+def get_security_groups(conn):
+ return conn.network.security_groups()
-def get_security_group_id(neutron_client, sg_name):
- security_groups = get_security_groups(neutron_client)
+def get_security_group_id(conn, sg_name):
+ security_groups = get_security_groups(conn)
id = ''
for sg in security_groups:
- if sg['name'] == sg_name:
- id = sg['id']
+ if sg.name == sg_name:
+ id = sg.id
break
return id
-def create_security_group(neutron_client, sg_name, sg_description):
- json_body = {'security_group': {'name': sg_name,
- 'description': sg_description}}
+def create_security_group(conn, sg_name, sg_description):
try:
- secgroup = neutron_client.create_security_group(json_body)
- return secgroup['security_group']
+ secgroup = conn.network.\
+ create_security_group(name=sg_name, description=sg_description)
+ return secgroup
except Exception as e:
- logger.error("Error [create_security_group(neutron_client, '%s', "
+ logger.error("Error [create_security_group(network, '%s', "
"'%s')]: %s" % (sg_name, sg_description, e))
return None
-def create_secgroup_rule(neutron_client, sg_id, direction, protocol,
+def create_secgroup_rule(conn, sg_id, direction, protocol,
port_range_min=None, port_range_max=None):
# We create a security group in 2 steps
- # 1 - we check the format and set the json body accordingly
- # 2 - we call neturon client to create the security group
+ # 1 - we check the format and set the secgroup rule attributes accordingly
+ # 2 - we call openstacksdk to create the security group
# Format check
- json_body = {'security_group_rule': {'direction': direction,
- 'security_group_id': sg_id,
- 'protocol': protocol}}
+ secgroup_rule_attrs = {'direction': direction,
+ 'security_group_id': sg_id,
+ 'protocol': protocol}
# parameters may be
# - both None => we do nothing
- # - both Not None => we add them to the json description
+ # - both Not None => we add them to the secgroup rule attributes
# but one cannot be None is the other is not None
if (port_range_min is not None and port_range_max is not None):
- # add port_range in json description
- json_body['security_group_rule']['port_range_min'] = port_range_min
- json_body['security_group_rule']['port_range_max'] = port_range_max
+ # add port_range in secgroup rule attributes
+ secgroup_rule_attrs['port_range_min'] = port_range_min
+ secgroup_rule_attrs['port_range_max'] = port_range_max
logger.debug("Security_group format set (port range included)")
else:
# either both port range are set to None => do nothing
# Create security group using neutron client
try:
- neutron_client.create_security_group_rule(json_body)
+ conn.network.create_security_group_rule(**secgroup_rule_attrs)
return True
except:
logger.exception("Impossible to create_security_group_rule,"
return False
-def get_security_group_rules(neutron_client, sg_id):
+def get_security_group_rules(conn, sg_id):
try:
- security_rules = neutron_client.list_security_group_rules()[
- 'security_group_rules']
+ security_rules = conn.network.security_group_rules()
security_rules = [rule for rule in security_rules
- if rule["security_group_id"] == sg_id]
+ if rule.security_group_id == sg_id]
return security_rules
except Exception as e:
- logger.error("Error [get_security_group_rules(neutron_client, sg_id)]:"
+ logger.error("Error [get_security_group_rules(network, sg_id)]:"
" %s" % e)
return None
-def check_security_group_rules(neutron_client, sg_id, direction, protocol,
+def check_security_group_rules(conn, sg_id, direction, protocol,
port_min=None, port_max=None):
try:
- security_rules = get_security_group_rules(neutron_client, sg_id)
+ security_rules = get_security_group_rules(conn, sg_id)
security_rules = [rule for rule in security_rules
- if (rule["direction"].lower() == direction and
- rule["protocol"].lower() == protocol and
- rule["port_range_min"] == port_min and
- rule["port_range_max"] == port_max)]
+ if (rule.direction.lower() == direction and
+ rule.protocol.lower() == protocol and
+ rule.port_range_min == port_min and
+ rule.port_range_max == port_max)]
if len(security_rules) == 0:
return True
else:
return False
except Exception as e:
logger.error("Error [check_security_group_rules("
- " neutron_client, sg_id, direction,"
+ " network, sg_id, direction,"
" protocol, port_min=None, port_max=None)]: "
"%s" % e)
return None
-def create_security_group_full(neutron_client,
+def create_security_group_full(conn,
sg_name, sg_description):
- sg_id = get_security_group_id(neutron_client, sg_name)
+ sg_id = get_security_group_id(conn, sg_name)
if sg_id != '':
logger.info("Using existing security group '%s'..." % sg_name)
else:
logger.info("Creating security group '%s'..." % sg_name)
- SECGROUP = create_security_group(neutron_client,
+ SECGROUP = create_security_group(conn,
sg_name,
sg_description)
if not SECGROUP:
logger.error("Failed to create the security group...")
return None
- sg_id = SECGROUP['id']
+ sg_id = SECGROUP.id
logger.debug("Security group '%s' with ID=%s created successfully."
- % (SECGROUP['name'], sg_id))
+ % (SECGROUP.name, sg_id))
logger.debug("Adding ICMP rules in security group '%s'..."
% sg_name)
- if not create_secgroup_rule(neutron_client, sg_id,
+ if not create_secgroup_rule(conn, sg_id,
'ingress', 'icmp'):
logger.error("Failed to create the security group rule...")
return None
logger.debug("Adding SSH rules in security group '%s'..."
% sg_name)
if not create_secgroup_rule(
- neutron_client, sg_id, 'ingress', 'tcp', '22', '22'):
+ conn, sg_id, 'ingress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return None
if not create_secgroup_rule(
- neutron_client, sg_id, 'egress', 'tcp', '22', '22'):
+ conn, sg_id, 'egress', 'tcp', '22', '22'):
logger.error("Failed to create the security group rule...")
return None
return sg_id
return False
-def update_sg_quota(neutron_client, tenant_id, sg_quota, sg_rule_quota):
- json_body = {"quota": {
- "security_group": sg_quota,
- "security_group_rule": sg_rule_quota
- }}
-
+def update_sg_quota(conn, tenant_id, sg_quota, sg_rule_quota):
try:
- neutron_client.update_quota(tenant_id=tenant_id,
- body=json_body)
+ conn.network.update_quota(tenant_id,
+ security_group_rules=sg_rule_quota,
+ security_groups=sg_quota)
return True
except Exception as e:
- logger.error("Error [update_sg_quota(neutron_client, '%s', '%s', "
+ logger.error("Error [update_sg_quota(network, '%s', '%s', "
"'%s')]: %s" % (tenant_id, sg_quota, sg_rule_quota, e))
return False
-def delete_security_group(neutron_client, secgroup_id):
+def delete_security_group(conn, secgroup_id):
try:
- neutron_client.delete_security_group(secgroup_id)
+ conn.network.delete_security_group(secgroup_id, ignore_missing=False)
return True
except Exception as e:
- logger.error("Error [delete_security_group(neutron_client, '%s')]: %s"
+ logger.error("Error [delete_security_group(network, '%s')]: %s"
% (secgroup_id, e))
return False
# *********************************************
# CINDER
# *********************************************
-def get_volumes(cinder_client):
+def get_volumes(conn):
try:
- volumes = cinder_client.volumes.list(search_opts={'all_tenants': 1})
+ volumes = conn.block_store.volumes(all_tenants=1)
return volumes
except Exception as e:
- logger.error("Error [get_volumes(cinder_client)]: %s" % e)
+ logger.error("Error [get_volumes(volume)]: %s" % e)
return None
-def update_cinder_quota(cinder_client, tenant_id, vols_quota,
+def update_cinder_quota(cloud, tenant_id, vols_quota,
snapshots_quota, gigabytes_quota):
quotas_values = {"volumes": vols_quota,
"snapshots": snapshots_quota,
"gigabytes": gigabytes_quota}
try:
- cinder_client.quotas.update(tenant_id, **quotas_values)
+ cloud.set_volume_quotas(tenant_id, **quotas_values)
return True
except Exception as e:
- logger.error("Error [update_cinder_quota(cinder_client, '%s', '%s', "
+ logger.error("Error [update_cinder_quota(volume, '%s', '%s', "
"'%s' '%s')]: %s" % (tenant_id, vols_quota,
snapshots_quota, gigabytes_quota, e))
return False
-def delete_volume(cinder_client, volume_id, forced=False):
+def delete_volume(cloud, volume_id, forced=False):
try:
if forced:
try:
- cinder_client.volumes.detach(volume_id)
+ volume = cloud.get_volume(volume_id)
+ for attachment in volume.attachments:
+ server = cloud.get_server(attachment.server_id)
+ cloud.detach_volume(server, volume)
except:
logger.error(sys.exc_info()[0])
- cinder_client.volumes.force_delete(volume_id)
+ cloud.delete_volume(volume_id, force=True)
else:
- cinder_client.volumes.delete(volume_id)
+ cloud.delete_volume(volume_id)
return True
except Exception as e:
- logger.error("Error [delete_volume(cinder_client, '%s', '%s')]: %s"
+ logger.error("Error [delete_volume(volume, '%s', '%s')]: %s"
% (volume_id, str(forced), e))
return False
common_config.custom_flavor_vcpus)
-def create_net(neutron_client, name):
+def create_net(conn, name):
logger.debug("Creating network %s", name)
- net_id = os_utils.create_neutron_net(neutron_client, name)
+ net_id = os_utils.create_neutron_net(conn, name)
if not net_id:
logger.error(
"There has been a problem when creating the neutron network")
return net_id
-def create_subnet(neutron_client, name, cidr, net_id):
+def create_subnet(conn, name, cidr, net_id):
logger.debug("Creating subnet %s in network %s with cidr %s",
name, net_id, cidr)
- subnet_id = os_utils.create_neutron_subnet(neutron_client,
+ subnet_id = os_utils.create_neutron_subnet(conn,
name,
cidr,
net_id)
return subnet_id
-def create_network(neutron_client, net, subnet1, cidr1,
+def create_network(conn, net, subnet1, cidr1,
router, subnet2=None, cidr2=None):
"""Network assoc won't work for networks/subnets created by this function.
It is an ODL limitation due to it handling routers as vpns.
See https://bugs.opendaylight.org/show_bug.cgi?id=6962"""
- network_dic = os_utils.create_network_full(neutron_client,
+ network_dic = os_utils.create_network_full(conn,
net,
subnet1,
router,
if subnet2 is not None:
logger.debug("Creating and attaching a second subnet...")
subnet_id = os_utils.create_neutron_subnet(
- neutron_client, subnet2, cidr2, net_id)
+ conn, subnet2, cidr2, net_id)
if not subnet_id:
logger.error(
"There has been a problem when creating the second subnet")
return net_id, subnet_id, router_id
-def get_port(neutron_client, instance_id):
- ports = os_utils.get_port_list(neutron_client)
- if ports is not None:
- for port in ports:
- if port['device_id'] == instance_id:
- return port
+def get_port(conn, instance_id):
+ ports = os_utils.get_port_list(conn)
+ for port in ports:
+ if port.device_id == instance_id:
+ return port
return None
-def update_port_allowed_address_pairs(neutron_client, port_id, address_pairs):
+def update_port_allowed_address_pairs(conn, port_id, address_pairs):
if len(address_pairs) <= 0:
return
allowed_address_pairs = []
address_pair_dict = {'ip_address': address_pair.ipaddress,
'mac_address': address_pair.macaddress}
allowed_address_pairs.append(address_pair_dict)
- json_body = {'port': {
- "allowed_address_pairs": allowed_address_pairs
- }}
try:
- port = neutron_client.update_port(port=port_id,
- body=json_body)
- return port['port']['id']
+ port = conn.network.\
+ update_port(port_id, allowed_address_pairs=allowed_address_pairs)
+ return port.id
except Exception as e:
- logger.error("Error [update_neutron_port(neutron_client, '%s', '%s')]:"
+ logger.error("Error [update_neutron_port(network, '%s', '%s')]:"
" %s" % (port_id, address_pairs, e))
return None
return compute_nodes
-def open_icmp(neutron_client, security_group_id):
- if os_utils.check_security_group_rules(neutron_client,
+def open_icmp(conn, security_group_id):
+ if os_utils.check_security_group_rules(conn,
security_group_id,
'ingress',
'icmp'):
- if not os_utils.create_secgroup_rule(neutron_client,
+ if not os_utils.create_secgroup_rule(conn,
security_group_id,
'ingress',
'icmp'):
% security_group_id)
-def open_http_port(neutron_client, security_group_id):
- if os_utils.check_security_group_rules(neutron_client,
+def open_http_port(conn, security_group_id):
+ if os_utils.check_security_group_rules(conn,
security_group_id,
'ingress',
'tcp',
80, 80):
- if not os_utils.create_secgroup_rule(neutron_client,
+ if not os_utils.create_secgroup_rule(conn,
security_group_id,
'ingress',
'tcp',
% security_group_id)
-def open_bgp_port(neutron_client, security_group_id):
- if os_utils.check_security_group_rules(neutron_client,
+def open_bgp_port(conn, security_group_id):
+ if os_utils.check_security_group_rules(conn,
security_group_id,
'ingress',
'tcp',
179, 179):
- if not os_utils.create_secgroup_rule(neutron_client,
+ if not os_utils.create_secgroup_rule(conn,
security_group_id,
'ingress',
'tcp',
compute_node.run_cmd(cmd.format(bridge=bridge))
-def cleanup_neutron(neutron_client, floatingip_ids, bgpvpn_ids, interfaces,
- subnet_ids, router_ids, network_ids):
-
+def cleanup_neutron(conn, neutron_client, floatingip_ids, bgpvpn_ids,
+ interfaces, subnet_ids, router_ids, network_ids):
if len(floatingip_ids) != 0:
for floatingip_id in floatingip_ids:
- if not os_utils.delete_floating_ip(neutron_client, floatingip_id):
+ if not os_utils.delete_floating_ip(conn, floatingip_id):
logger.error('Fail to delete all floating ips. '
'Floating ip with id {} was not deleted.'.
format(floatingip_id))
if len(interfaces) != 0:
for router_id, subnet_id in interfaces:
- if not os_utils.remove_interface_router(neutron_client,
+ if not os_utils.remove_interface_router(conn,
router_id, subnet_id):
logger.error('Fail to delete all interface routers. '
'Interface router with id {} was not deleted.'.
if len(router_ids) != 0:
for router_id in router_ids:
- if not os_utils.remove_gateway_router(neutron_client, router_id):
+ if not os_utils.remove_gateway_router(conn, router_id):
logger.error('Fail to delete all gateway routers. '
'Gateway router with id {} was not deleted.'.
format(router_id))
if len(subnet_ids) != 0:
for subnet_id in subnet_ids:
- if not os_utils.delete_neutron_subnet(neutron_client, subnet_id):
+ if not os_utils.delete_neutron_subnet(conn, subnet_id):
logger.error('Fail to delete all subnets. '
'Subnet with id {} was not deleted.'.
format(subnet_id))
if len(router_ids) != 0:
for router_id in router_ids:
- if not os_utils.delete_neutron_router(neutron_client, router_id):
+ if not os_utils.delete_neutron_router(conn, router_id):
logger.error('Fail to delete all routers. '
'Router with id {} was not deleted.'.
format(router_id))
if len(network_ids) != 0:
for network_id in network_ids:
- if not os_utils.delete_neutron_net(neutron_client, network_id):
+ if not os_utils.delete_neutron_net(conn, network_id):
logger.error('Fail to delete all networks. '
'Network with id {} was not deleted.'.
format(network_id))
return is_secure
-def update_nw_subnet_port_quota(neutron_client, tenant_id, nw_quota,
+def update_nw_subnet_port_quota(conn, tenant_id, nw_quota,
subnet_quota, port_quota, router_quota):
- json_body = {"quota": {
- "network": nw_quota,
- "subnet": subnet_quota,
- "port": port_quota,
- "router": router_quota
- }}
-
try:
- neutron_client.update_quota(tenant_id=tenant_id,
- body=json_body)
+ conn.network.update_quota(tenant_id, networks=nw_quota,
+ subnets=subnet_quota, ports=port_quota,
+ routers=router_quota)
return True
except Exception as e:
- logger.error("Error [update_nw_subnet_port_quota(neutron_client,"
+ logger.error("Error [update_nw_subnet_port_quota(network,"
" '%s', '%s', '%s', '%s, %s')]: %s" %
(tenant_id, nw_quota, subnet_quota,
port_quota, router_quota, e))
return False
-def get_neutron_quota(neutron_client, tenant_id):
+def get_neutron_quota(conn, tenant_id):
try:
- return neutron_client.show_quota(tenant_id=tenant_id)['quota']
+ return conn.network.quotas(project_id=tenant_id).next()
except Exception as e:
- logger.error("Error in getting neutron quota for tenant "
+ logger.error("Error in getting network quota for tenant "
" '%s' )]: %s" % (tenant_id, e))
raise
raise
-def update_router_extra_route(neutron_client, router_id, extra_routes):
+def update_router_extra_route(conn, router_id, extra_routes):
if len(extra_routes) <= 0:
return
routes_list = []
route_dict = {'destination': extra_route.destination,
'nexthop': extra_route.nexthop}
routes_list.append(route_dict)
- json_body = {'router': {
- "routes": routes_list
- }}
try:
- neutron_client.update_router(router_id, body=json_body)
+ conn.network.update_router(router_id, routes=routes_list)
return True
except Exception as e:
logger.error("Error in updating router with extra route: %s" % e)
raise
-def update_router_no_extra_route(neutron_client, router_ids):
- json_body = {'router': {
- "routes": [
- ]}}
-
+def update_router_no_extra_route(conn, router_ids):
for router_id in router_ids:
try:
- neutron_client.update_router(router_id, body=json_body)
+ conn.network.update_router(router_id, routes=[])
return True
except Exception as e:
logger.error("Error in clearing extra route: %s" % e)
def execute(self):
cloud = os_utils.get_os_cloud()
- neutron_client = os_utils.get_neutron_client()
+ conn = os_utils.get_os_connection()
tenant_id = os_utils.get_tenant_id(os_utils.get_keystone_client(),
os.environ['OS_PROJECT_NAME'])
- neutron_quota = test_utils.get_neutron_quota(neutron_client, tenant_id)
+ neutron_quota = test_utils.get_neutron_quota(conn, tenant_id)
(neutron_nw_quota, neutron_subnet_quota, neutron_port_quota,
neutron_router_quota) = (
- neutron_quota['network'], neutron_quota['subnet'],
- neutron_quota['port'], neutron_quota['router'])
+ neutron_quota.networks, neutron_quota.subnets,
+ neutron_quota.ports, neutron_quota.routers)
instances_quota = test_utils.get_nova_instances_quota(cloud)
logger.info("Setting net/subnet/port/router "
"quota to unlimited")
test_utils.update_nw_subnet_port_quota(
- neutron_client,
+ conn,
tenant_id,
COMMON_CONFIG.neutron_nw_quota,
COMMON_CONFIG.neutron_subnet_quota,
# Clean up the stale floating ip's so that required
# ip addresses are available for sdnvpn testcases
logger.info("Cleaning up the Floating IP Addresses")
- floating_ips = os_utils.get_floating_ips(neutron_client)
- if floating_ips is not None:
- for floating_ip in floating_ips:
- os_utils.delete_floating_ip(
- neutron_client, floating_ip['id'])
+ floating_ips = os_utils.get_floating_ips(conn)
+ for floating_ip in floating_ips:
+ os_utils.delete_floating_ip(conn, floating_ip.id)
# Workaround for
# https://jira.opnfv.org/browse/SNAPS-318
# Clean up the stale routers
logger.info("Cleaning up the stale routers")
- ports = os_utils.get_port_list(neutron_client)
- if ports is not None:
- for port in ports:
- if port['device_owner'] == 'network:router_interface':
- os_utils.delete_neutron_port(
- neutron_client, port['id'])
- routers = os_utils.get_router_list(neutron_client)
- if routers is not None:
- for router in routers:
- os_utils.remove_gateway_router(
- neutron_client, router['id'])
- os_utils.delete_neutron_router(
- neutron_client, router['id'])
+ ports = os_utils.get_port_list(conn)
+ for port in ports:
+ if port.device_owner == 'network:router_interface':
+ os_utils.delete_neutron_port(conn, port.id)
+ routers = os_utils.get_router_list(conn)
+ for router in routers:
+ os_utils.remove_gateway_router(conn, router.id)
+ os_utils.delete_neutron_router(conn, router.id)
with open(COMMON_CONFIG.config_file) as f:
config_yaml = yaml.safe_load(f)
overall_status = "FAIL"
logger.info("Resetting subnet/net/port quota")
- test_utils.update_nw_subnet_port_quota(neutron_client,
+ test_utils.update_nw_subnet_port_quota(conn,
tenant_id,
neutron_nw_quota,
neutron_subnet_quota,
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
container="bare", public='public')
image_ids.append(image_id)
- network_1_id = test_utils.create_net(neutron_client,
+ network_1_id = test_utils.create_net(conn,
TESTCASE_CONFIG.net_1_name)
- subnet_1_id = test_utils.create_subnet(neutron_client,
+ subnet_1_id = test_utils.create_subnet(conn,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
network_1_id)
- network_2_id = test_utils.create_net(neutron_client,
+ network_2_id = test_utils.create_net(conn,
TESTCASE_CONFIG.net_2_name)
- subnet_2_id = test_utils.create_subnet(neutron_client,
+ subnet_2_id = test_utils.create_subnet(conn,
TESTCASE_CONFIG.subnet_2_name,
TESTCASE_CONFIG.subnet_2_cidr,
network_2_id)
subnet_ids.extend([subnet_1_id, subnet_2_id])
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
compute_nodes = test_utils.assert_and_get_compute_nodes(conn)
finally:
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
public='public')
image_ids.append(image_id)
- network_1_id = test_utils.create_net(neutron_client,
+ network_1_id = test_utils.create_net(conn,
TESTCASE_CONFIG.net_1_name)
- subnet_1_id = test_utils.create_subnet(neutron_client,
+ subnet_1_id = test_utils.create_subnet(conn,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
network_1_id)
network_ids.append(network_1_id)
subnet_ids.append(subnet_1_id)
- sg_id = os_utils.create_security_group_full(neutron_client,
+ sg_id = os_utils.create_security_group_full(conn,
TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids, bgpvpn_ids,
- interfaces, subnet_ids, router_ids,
- network_ids)
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
+ bgpvpn_ids, interfaces, subnet_ids,
+ router_ids, network_ids)
return results.compile_summary()
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
openstack_nodes = test_utils.get_nodes()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
container="bare", public='public')
image_ids.append(image_id)
- network_1_id = test_utils.create_net(neutron_client,
+ network_1_id = test_utils.create_net(conn,
TESTCASE_CONFIG.net_1_name)
- subnet_1_id = test_utils.create_subnet(neutron_client,
+ subnet_1_id = test_utils.create_subnet(conn,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
network_1_id)
subnet_ids.append(subnet_1_id)
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
# Check required number of compute nodes
# Cleanup topology
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids, bgpvpn_ids,
- interfaces, subnet_ids, router_ids,
- network_ids)
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
+ bgpvpn_ids, interfaces, subnet_ids,
+ router_ids, network_ids)
# Connect again OVS to Controller
for compute_node in compute_nodes:
compute_node.run_cmd("sudo ovs-vsctl set-controller {} {}".
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
openstack_nodes = test_utils.get_nodes()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
container="bare", public='public')
image_ids.append(image_id)
- network_1_id = test_utils.create_net(neutron_client,
+ network_1_id = test_utils.create_net(conn,
TESTCASE_CONFIG.net_1_name)
- subnet_1_id = test_utils.create_subnet(neutron_client,
+ subnet_1_id = test_utils.create_subnet(conn,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
network_1_id)
subnet_ids.append(subnet_1_id)
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
# Check required number of compute nodes
# Cleanup topology
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids, bgpvpn_ids,
- interfaces, subnet_ids, router_ids,
- network_ids)
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
+ bgpvpn_ids, interfaces, subnet_ids,
+ router_ids, network_ids)
return results.compile_summary()
if not os.path.isfile(COMMON_CONFIG.ubuntu_image_path):
logger.info("Downloading image")
image_dest_path = '/'.join(
- COMMON_CONFIG.ubuntu_image_path.split('/')[:-1])
+ COMMON_CONFIG.ubuntu_image_path.split('/')[:-1])
os_utils.download_url(
"http://artifacts.opnfv.org/sdnvpn/"
"ubuntu-16.04-server-cloudimg-amd64-disk1.img",
logger.info("Using old image")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids, flavor_ids) = ([] for i in range(9))
flavor_ids.append(flavor_id)
network_1_id, subnet_1_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
router_ids.extend([router_1_id])
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
compute_nodes = test_utils.assert_and_get_compute_nodes(conn)
userdata=u1)
vm_1_ip = test_utils.get_instance_ip(conn, vm_1)
- vm1_port = test_utils.get_port(neutron_client, vm_1.id)
+ vm1_port = test_utils.get_port(conn, vm_1.id)
test_utils.update_port_allowed_address_pairs(
- neutron_client,
- vm1_port['id'],
+ conn,
+ vm1_port.id,
[test_utils.AllowedAddressPair(
TESTCASE_CONFIG.extra_route_cidr,
- vm1_port['mac_address'])])
+ vm1_port.mac_address)])
vm_2 = test_utils.create_instance(
conn,
userdata=u1)
vm_2_ip = test_utils.get_instance_ip(conn, vm_2)
- vm2_port = test_utils.get_port(neutron_client, vm_2.id)
+ vm2_port = test_utils.get_port(conn, vm_2.id)
test_utils.update_port_allowed_address_pairs(
- neutron_client,
- vm2_port['id'],
+ conn,
+ vm2_port.id,
[test_utils.AllowedAddressPair(
TESTCASE_CONFIG.extra_route_cidr,
- vm2_port['mac_address'])])
+ vm2_port.mac_address)])
test_utils.async_Wait_for_instances([vm_1, vm_2])
neutron_client, bgpvpn_id, router_1_id)
test_utils.update_router_extra_route(
- neutron_client, router_1_id,
+ conn, router_1_id,
[test_utils.ExtraRoute(TESTCASE_CONFIG.extra_route_cidr,
vm_1_ip),
test_utils.ExtraRoute(TESTCASE_CONFIG.extra_route_cidr,
logger.error("exception occurred while executing testcase_13: %s", e)
raise
finally:
- test_utils.update_router_no_extra_route(neutron_client, router_ids)
+ test_utils.update_router_no_extra_route(conn, router_ids)
test_utils.cleanup_nova(conn, instance_ids, flavor_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
image_ids.append(image_id)
network_1_id = test_utils.create_net(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name)
subnet_1a_id = test_utils.create_subnet(
- neutron_client,
+ conn,
TESTCASE_CONFIG.subnet_1a_name,
TESTCASE_CONFIG.subnet_1a_cidr,
network_1_id)
# TODO: uncomment the commented lines once ODL has
# support for mulitple subnets under same neutron network
# subnet_1b_id = test_utils.create_subnet(
- # neutron_client,
+ # conn,
# TESTCASE_CONFIG.subnet_1b_name,
# TESTCASE_CONFIG.subnet_1b_cidr,
# network_1_id)
network_2_id = test_utils.create_net(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_2_name)
# subnet_2a_id = test_utils.create_subnet(
- # neutron_client,
+ # conn,
# TESTCASE_CONFIG.subnet_2a_name,
# TESTCASE_CONFIG.subnet_2a_cidr,
# network_2_id)
subnet_2b_id = test_utils.create_subnet(
- neutron_client,
+ conn,
TESTCASE_CONFIG.subnet_2b_name,
TESTCASE_CONFIG.subnet_2b_cidr,
network_2_id)
subnet_2b_id])
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
compute_nodes = test_utils.assert_and_get_compute_nodes(conn)
finally:
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
if not os.path.isfile(COMMON_CONFIG.ubuntu_image_path):
logger.info("Downloading image")
image_dest_path = '/'.join(
- COMMON_CONFIG.ubuntu_image_path.split('/')[:-1])
+ COMMON_CONFIG.ubuntu_image_path.split('/')[:-1])
os_utils.download_url(
"http://artifacts.opnfv.org/sdnvpn/"
"ubuntu-16.04-server-cloudimg-amd64-disk1.img",
else:
logger.info("Using old image")
- conn = os_utils.get_os_connection()
neutron_client = os_utils.get_neutron_client()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
flavor_ids.append(flavor_id)
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
- test_utils.open_icmp(neutron_client, sg_id)
- test_utils.open_http_port(neutron_client, sg_id)
+ test_utils.open_icmp(conn, sg_id)
+ test_utils.open_http_port(conn, sg_id)
- test_utils.open_bgp_port(neutron_client, sg_id)
+ test_utils.open_bgp_port(conn, sg_id)
image_id = os_utils.create_glance_image(
conn, TESTCASE_CONFIG.image_name,
image_ids.append(image_id)
net_1_id, subnet_1_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
quagga_net_id, subnet_quagga_id, \
router_quagga_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.quagga_net_name,
TESTCASE_CONFIG.quagga_subnet_name,
TESTCASE_CONFIG.quagga_subnet_cidr,
# cloud-init script.
# fake_fip is needed to bypass NAT
# see below for the reason why.
- fake_fip = os_utils.create_floating_ip(neutron_client)
+ fake_fip = os_utils.create_floating_ip(conn)
# pin quagga to some compute
floatingip_ids.append(fake_fip['fip_id'])
compute_node = conn.compute.hypervisors().next()
instance_ids.append(quagga_vm.id)
- quagga_vm_port = test_utils.get_port(neutron_client,
+ quagga_vm_port = test_utils.get_port(conn,
quagga_vm.id)
- fip_added = os_utils.attach_floating_ip(neutron_client,
- quagga_vm_port['id'])
+ fip_added = os_utils.attach_floating_ip(conn,
+ quagga_vm_port.id)
msg = ("Assign a Floating IP to %s " %
TESTCASE_CONFIG.quagga_instance_name)
if fip_added:
results.add_success(msg)
- floatingip_ids.append(fip_added['floatingip']['id'])
+ floatingip_ids.append(fip_added.id)
else:
results.add_failure(msg)
test_utils.detach_instance_from_ext_br(quagga_vm, compute)
test_utils.cleanup_nova(conn, instance_ids, flavor_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
if fake_fip is not None:
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
image_ids.append(image_id)
network_1_id, subnet_1_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
TESTCASE_CONFIG.router_1_name)
network_2_id = test_utils.create_net(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_2_name)
subnet_2_id = test_utils.create_subnet(
- neutron_client,
+ conn,
TESTCASE_CONFIG.subnet_2_name,
TESTCASE_CONFIG.subnet_2_cidr,
network_2_id)
subnet_ids.extend([subnet_1_id, subnet_2_id])
sg_id = os_utils.create_security_group_full(
- neutron_client,
+ conn,
TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
finally:
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
image_ids.append(image_id)
network_1_id, subnet_1_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
TESTCASE_CONFIG.router_1_name)
network_2_id, subnet_2_id, router_2_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_2_name,
TESTCASE_CONFIG.subnet_2_name,
TESTCASE_CONFIG.subnet_2_cidr,
subnet_ids.extend([subnet_1_id, subnet_2_id])
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
- test_utils.open_icmp(neutron_client, sg_id)
- test_utils.open_http_port(neutron_client, sg_id)
+ test_utils.open_icmp(conn, sg_id)
+ test_utils.open_http_port(conn, sg_id)
vm_2 = test_utils.create_instance(
conn,
results.record_action(msg)
results.add_to_summary(0, '-')
- vm2_port = test_utils.get_port(neutron_client,
- vm_2.id)
- fip_added = os_utils.attach_floating_ip(neutron_client,
- vm2_port['id'])
+ vm2_port = test_utils.get_port(conn, vm_2.id)
+ fip_added = os_utils.attach_floating_ip(conn, vm2_port.id)
if fip_added:
results.add_success(msg)
else:
results.add_failure(msg)
- results.ping_ip_test(fip_added['floatingip']['floating_ip_address'])
+ results.ping_ip_test(fip_added.floating_ip_address)
- floatingip_ids.append(fip_added['floatingip']['id'])
+ floatingip_ids.append(fip_added.id)
except Exception as e:
logger.error("exception occurred while executing testcase_7: %s", e)
finally:
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)
results.add_to_summary(0, "=")
neutron_client = os_utils.get_neutron_client()
- conn = os_utils.get_os_connection()
(floatingip_ids, instance_ids, router_ids, network_ids, image_ids,
subnet_ids, interfaces, bgpvpn_ids) = ([] for i in range(8))
image_ids.append(image_id)
network_1_id, subnet_1_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_1_name,
TESTCASE_CONFIG.subnet_1_name,
TESTCASE_CONFIG.subnet_1_cidr,
TESTCASE_CONFIG.router_1_name)
network_2_id, subnet_2_id, router_1_id = test_utils.create_network(
- neutron_client,
+ conn,
TESTCASE_CONFIG.net_2_name,
TESTCASE_CONFIG.subnet_2_name,
TESTCASE_CONFIG.subnet_2_cidr,
subnet_ids.extend([subnet_1_id, subnet_2_id])
sg_id = os_utils.create_security_group_full(
- neutron_client, TESTCASE_CONFIG.secgroup_name,
+ conn, TESTCASE_CONFIG.secgroup_name,
TESTCASE_CONFIG.secgroup_descr)
- test_utils.open_icmp(neutron_client, sg_id)
- test_utils.open_http_port(neutron_client, sg_id)
+ test_utils.open_icmp(conn, sg_id)
+ test_utils.open_http_port(conn, sg_id)
compute_nodes = test_utils.assert_and_get_compute_nodes(conn)
av_zone_1 = "nova:" + compute_nodes[0]
msg = "Assign a Floating IP to %s" % vm_1.name
results.record_action(msg)
- vm1_port = test_utils.get_port(neutron_client, vm_1.id)
- fip_added = os_utils.attach_floating_ip(neutron_client,
- vm1_port['id'])
+ vm1_port = test_utils.get_port(conn, vm_1.id)
+ fip_added = os_utils.attach_floating_ip(conn, vm1_port.id)
if fip_added:
results.add_success(msg)
else:
results.add_failure(msg)
- fip = fip_added['floatingip']['floating_ip_address']
+ fip = fip_added.floating_ip_address
results.add_to_summary(0, "=")
results.record_action("Ping %s via Floating IP" % vm_1.name)
results.add_to_summary(0, "-")
results.ping_ip_test(fip)
- floatingip_ids.append(fip_added['floatingip']['id'])
+ floatingip_ids.append(fip_added.id)
except Exception as e:
logger.error("exception occurred while executing testcase_8: %s", e)
finally:
test_utils.cleanup_nova(conn, instance_ids)
test_utils.cleanup_glance(conn, image_ids)
- test_utils.cleanup_neutron(neutron_client, floatingip_ids,
+ test_utils.cleanup_neutron(conn, neutron_client, floatingip_ids,
bgpvpn_ids, interfaces, subnet_ids,
router_ids, network_ids)