opnfv_vnf.socket = 0
opnfv_vnf.start_core = 0
opnfv_vnf.update_write_parser = mock.MagicMock()
- self.assertEqual(None, opnfv_vnf.update_timer())
+ self.assertIsNone(opnfv_vnf.update_timer())
def test_generate_script(self):
topology_file = mock.Mock()
opnfv_vnf.worker_config = '1t'
opnfv_vnf.start_core = 0
result = opnfv_vnf.generate_next_core_id()
- self.assertEqual(None, result)
+ self.assertIsNone(result)
opnfv_vnf.worker_config = '2t'
opnfv_vnf.start_core = 'a'
self.assertRaises(ValueError, opnfv_vnf.generate_next_core_id)
opnfv_vnf.worker_config = '2t'
opnfv_vnf.start_core = 1
result = opnfv_vnf.generate_next_core_id()
- self.assertEqual(None, result)
+ self.assertIsNone(result)
def test_generate_lb_to_port_pair_mapping(self):
topology_file = mock.Mock()
opnfv_vnf._port_pairs = samplevnf_helper.PortPairs(vnfd_mock.interfaces)
opnfv_vnf.port_pair_list = opnfv_vnf._port_pairs.port_pair_list
result = opnfv_vnf.generate_lb_to_port_pair_mapping()
- self.assertEqual(None, result)
+ self.assertIsNone(result)
result = opnfv_vnf.set_priv_to_pub_mapping()
self.assertEqual('(0,1)', result)
opnfv_vnf.start_core = 0
opnfv_vnf.lb_count = 1
result = opnfv_vnf.set_priv_que_handler()
- self.assertEqual(None, result)
+ self.assertIsNone(result)
def test_generate_arp_route_tbl(self):
# ELF: could n=do this in setup
self.amqp_consumer._connection.add_on_close_callback = \
mock.Mock(return_value=0)
self.amqp_consumer._connection.channel = mock.Mock(return_value=0)
- self.assertEqual(None, self.amqp_consumer.on_connection_open(10))
+ self.assertIsNone(self.amqp_consumer.on_connection_open(10))
def test_on_connection_closed(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
self.amqp_consumer._connection.add_timeout = mock.Mock(return_value=0)
self.amqp_consumer._closing = True
- self.assertEqual(None,
- self.amqp_consumer.on_connection_closed("", 404,
- "Not Found"))
+ self.assertIsNone(
+ self.amqp_consumer.on_connection_closed("", 404, "Not Found"))
self.amqp_consumer._closing = False
- self.assertEqual(None,
- self.amqp_consumer.on_connection_closed("", 404,
- "Not Found"))
+ self.assertIsNone(
+ self.amqp_consumer.on_connection_closed("", 404, "Not Found"))
def test_reconnect(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer._connection.ioloop.stop = mock.Mock(return_value=0)
self.amqp_consumer.connect = mock.Mock(return_value=0)
self.amqp_consumer._closing = True
- self.assertEqual(None, self.amqp_consumer.reconnect())
+ self.assertIsNone(self.amqp_consumer.reconnect())
def test_on_channel_open(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer.add_on_channel_close_callback = mock.Mock()
self.amqp_consumer._channel.exchange_declare = \
mock.Mock(return_value=0)
- self.assertEqual(None,
- self.amqp_consumer.on_channel_open(
- self.amqp_consumer._channel))
+ self.assertIsNone(
+ self.amqp_consumer.on_channel_open(self.amqp_consumer._channel))
def test_add_on_channel_close_callback(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
mock.Mock(return_value=0)
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
- self.assertEqual(None,
- self.amqp_consumer.add_on_channel_close_callback())
+ self.assertIsNone(self.amqp_consumer.add_on_channel_close_callback())
def test_on_channel_closed(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer._connection.close = mock.Mock(return_value=0)
_channel = mock.Mock()
- self.assertEqual(None,
- self.amqp_consumer.on_channel_closed(_channel,
- "", ""))
+ self.assertIsNone(
+ self.amqp_consumer.on_channel_closed(_channel, "", ""))
def test_ion_exchange_declareok(self):
self.amqp_consumer.setup_queue = mock.Mock(return_value=0)
- self.assertEqual(None, self.amqp_consumer.on_exchange_declareok(10))
+ self.assertIsNone(self.amqp_consumer.on_exchange_declareok(10))
def test_setup_queue(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.add_on_close_callback = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.setup_queue("collectd"))
+ self.assertIsNone(self.amqp_consumer.setup_queue("collectd"))
def test_on_queue_declareok(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.queue_bind = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.on_queue_declareok(10))
+ self.assertIsNone(self.amqp_consumer.on_queue_declareok(10))
def test__on_bindok(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.basic_consume = mock.Mock()
self.amqp_consumer.add_on_cancel_callback = mock.Mock()
- self.assertEqual(None, self.amqp_consumer._on_bindok(10))
+ self.assertIsNone(self.amqp_consumer._on_bindok(10))
def test_add_on_cancel_callback(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.add_on_cancel_callback = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.add_on_cancel_callback())
+ self.assertIsNone(self.amqp_consumer.add_on_cancel_callback())
def test_on_consumer_cancelled(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.close = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.on_consumer_cancelled(10))
+ self.assertIsNone(self.amqp_consumer.on_consumer_cancelled(10))
def test_on_message(self):
body = "msg {} cpu/cpu-0/ipc 101010:10"
basic_deliver = mock.Mock()
basic_deliver.delivery_tag = mock.Mock(return_value=0)
self.amqp_consumer.ack_message = mock.Mock()
- self.assertEqual(None,
- self.amqp_consumer.on_message(10, basic_deliver,
- properties, body))
+ self.assertIsNone(
+ self.amqp_consumer.on_message(10, basic_deliver, properties, body))
def test_ack_message(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.basic_ack = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.ack_message(10))
+ self.assertIsNone(self.amqp_consumer.ack_message(10))
def test_on_cancelok(self):
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.close = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.on_cancelok(10))
+ self.assertIsNone(self.amqp_consumer.on_cancelok(10))
def test_run(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer.connect = mock.Mock()
self.amqp_consumer._connection.ioloop.start = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.run())
+ self.assertIsNone(self.amqp_consumer.run())
def test_stop(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer._connection.ioloop.start = mock.Mock()
self.amqp_consumer._channel = mock.Mock()
self.amqp_consumer._channel.basic_cancel = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.stop())
+ self.assertIsNone(self.amqp_consumer.stop())
def test_close_connection(self):
self.amqp_consumer._connection = mock.Mock(autospec=AmqpConsumer)
self.amqp_consumer._connection.close = mock.Mock()
- self.assertEqual(None, self.amqp_consumer.close_connection())
+ self.assertIsNone(self.amqp_consumer.close_connection())
self.resource_profile.run_collectd_amqp = \
mock.Mock(return_value=0)
res = self.resource_profile.amqp_process_for_nfvi_kpi()
- self.assertEqual(None, res)
+ self.assertIsNone(res)
def test_amqp_collect_nfvi_kpi(self):
self.resource_profile.amqp_client = \
fixed_profile = FixedProfile(self.TRAFFIC_PROFILE)
fixed_profile.params = self.TRAFFIC_PROFILE
fixed_profile.first_run = True
- self.assertEqual(None, fixed_profile.execute(traffic_generator))
+ self.assertIsNone(fixed_profile.execute(traffic_generator))
traffic_profile_generic_htt_p = \
TrafficProfileGenericHTTP(TrafficProfile)
traffic_generator = {}
- self.assertEqual(None,
- traffic_profile_generic_htt_p.execute(
- traffic_generator))
+ self.assertIsNone(
+ traffic_profile_generic_htt_p.execute(traffic_generator))
def test__send_http_request(self):
traffic_profile_generic_htt_p = \
TrafficProfileGenericHTTP(TrafficProfile)
- self.assertEqual(None,
- traffic_profile_generic_htt_p._send_http_request(
+ self.assertIsNone(traffic_profile_generic_htt_p._send_http_request(
"10.1.1.1", "250", "/req"))
r_f_c2544_profile.get_multiplier = mock.Mock()
r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
ixia_obj = mock.MagicMock()
- self.assertEqual(None, r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj))
+ self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator, ixia_obj))
def test_update_traffic_profile(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
mock.Mock(return_value={})
r_f_c2544_profile.full_profile = {}
r_f_c2544_profile._ixia_traffic_generate = mock.Mock()
- self.assertEqual(
- None,
+ self.assertIsNone(
r_f_c2544_profile.start_ixia_latency(traffic_generator, ixia_obj))
r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.params = self.PROFILE
r_f_c2544_profile.first_run = True
- self.assertEqual(None, r_f_c2544_profile.execute_traffic(traffic_generator))
+ self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator))
def test_get_drop_percentage(self):
traffic_generator = mock.Mock(autospec=TrexProfile)
mock.Mock(return_value=True)
r_f_c2544_profile = RFC2544Profile(self.TRAFFIC_PROFILE)
r_f_c2544_profile.params = self.PROFILE
- self.assertIsNone(
- r_f_c2544_profile.execute_traffic(traffic_generator))
+ self.assertIsNone(r_f_c2544_profile.execute_traffic(traffic_generator))
samples = {}
for ifname in range(1):
name = "xe{}".format(ifname)
trex_profile = \
TrexProfile(TrafficProfile)
- self.assertEqual(None, trex_profile.set_qinq(qinq))
+ self.assertIsNone(trex_profile.set_qinq(qinq))
qinq = {"S-VLAN": {"id": "128-130", "priority": 0, "cfi": 0},
"C-VLAN": {"id": "512-515", "priority": 0, "cfi": 0}}
- self.assertEqual(None, trex_profile.set_qinq(qinq))
+ self.assertIsNone(trex_profile.set_qinq(qinq))
def test__set_outer_l2_fields(self):
trex_profile = \
"C-VLAN": {"id": 512, "priority": 0, "cfi": 0}}
outer_l2 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l2']
outer_l2['QinQ'] = qinq
- self.assertEqual(None, trex_profile._set_outer_l2_fields(outer_l2))
+ self.assertIsNone(trex_profile._set_outer_l2_fields(outer_l2))
def test__set_outer_l3v4_fields(self):
trex_profile = \
TrexProfile(TrafficProfile)
outer_l3v4 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l3v4']
outer_l3v4['proto'] = 'tcp'
- self.assertEqual(None, trex_profile._set_outer_l3v4_fields(outer_l3v4))
+ self.assertIsNone(trex_profile._set_outer_l3v4_fields(outer_l3v4))
def test__set_outer_l3v6_fields(self):
trex_profile = \
outer_l3v6['proto'] = 'tcp'
outer_l3v6['tc'] = 1
outer_l3v6['hlim'] = 10
- self.assertEqual(None, trex_profile._set_outer_l3v6_fields(outer_l3v6))
+ self.assertIsNone(trex_profile._set_outer_l3v6_fields(outer_l3v6))
def test__set_outer_l4_fields(self):
trex_profile = \
TrexProfile(TrafficProfile)
outer_l4 = self.PROFILE[TrafficProfile.UPLINK]['ipv4']['outer_l4']
- self.assertEqual(None, trex_profile._set_outer_l4_fields(outer_l4))
+ self.assertIsNone(trex_profile._set_outer_l4_fields(outer_l4))
def test_get_streams(self):
trex_profile = \
acl_approx_vnf.vnf_execute = mock.MagicMock()
acl_approx_vnf.dpdk_devbind = "dpdk-devbind.py"
acl_approx_vnf._resource_collect_stop = mock.Mock()
- self.assertEqual(None, acl_approx_vnf.terminate())
+ self.assertIsNone(acl_approx_vnf.terminate())
def test_close(self):
queue_file_wrapper = \
base.QueueFileWrapper(self.q_in, self.q_out, self.prompt)
- self.assertEqual(None, queue_file_wrapper.close())
+ self.assertIsNone(queue_file_wrapper.close())
def test_read(self):
queue_file_wrapper = \
router_vnf = RouterVNF(name, vnfd)
router_vnf._vnf_process = mock.MagicMock()
router_vnf._vnf_process.terminate = mock.Mock()
- self.assertEqual(None, router_vnf.terminate())
+ self.assertIsNone(router_vnf.terminate())
if __name__ == '__main__':
unittest.main()
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
ixload_traffic_gen = IxLoadTrafficGen(NAME, vnfd)
- self.assertEqual(None, ixload_traffic_gen.listen_traffic({}))
+ self.assertIsNone(ixload_traffic_gen.listen_traffic({}))
@mock.patch.object(utils, 'find_relative_file')
@mock.patch.object(utils, 'makedirs')
def test_terminate(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
ixload_traffic_gen = IxLoadTrafficGen(NAME, vnfd)
- self.assertEqual(None, ixload_traffic_gen.terminate())
+ self.assertIsNone(ixload_traffic_gen.terminate())
@mock.patch("yardstick.network_services.vnf_generic.vnf.tg_ixload.call")
@mock.patch.object(ssh, 'SSH')
prox_traffic_gen._vnf_wrapper.setup_helper = mock.MagicMock()
prox_traffic_gen._vnf_wrapper._vnf_process = mock.MagicMock()
prox_traffic_gen._vnf_wrapper.resource_helper = mock.MagicMock()
- self.assertEqual(None, prox_traffic_gen.terminate())
+ self.assertIsNone(prox_traffic_gen.terminate())
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
ixnet_traffic_gen = IxiaTrafficGen(NAME, vnfd)
- self.assertEqual(None, ixnet_traffic_gen.listen_traffic({}))
+ self.assertIsNone(ixnet_traffic_gen.listen_traffic({}))
def test_instantiate(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ixnet_traffic_gen._ixia_traffic_gen.ix_stop_traffic = mock.Mock()
ixnet_traffic_gen._traffic_process = mock.MagicMock()
ixnet_traffic_gen._traffic_process.terminate = mock.Mock()
- self.assertEqual(None, ixnet_traffic_gen.terminate())
+ self.assertIsNone(ixnet_traffic_gen.terminate())
def _get_file_abspath(self, filename):
curr_path = os.path.dirname(os.path.abspath(__file__))
@mock.patch("{}.AnsibleCommon".format(PREFIX))
def test__do_ansible_job(self, *args):
- self.assertEqual(None, self.test_context._do_ansible_job('dummy'))
+ self.assertIsNone(self.test_context._do_ansible_job('dummy'))
def test_init(self):
self.test_context.init(self.attrs)
mock_dispatcher.get = mock.MagicMock(return_value=[dispatcher1,
dispatcher2])
- self.assertEqual(None, t._do_output(output_config, {}))
+ self.assertIsNone(t._do_output(output_config, {}))
@mock.patch.object(task, 'Context')
def test_parse_networks_from_nodes(self, mock_context):
self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
self.s._fill_traffic_profile = \
mock.Mock(return_value=TRAFFIC_PROFILE)
- self.assertEqual(None, self.s.setup())
+ self.assertIsNone(self.s.setup())
def test_setup_exception(self):
with mock.patch("yardstick.ssh.SSH") as ssh:
def test_ansible_node_getattr(self):
a = ansible_common.AnsibleNode({"name": "name"})
- self.assertEqual(getattr(a, "nosuch", None), None)
+ self.assertIsNone(getattr(a, "nosuch", None))
class AnsibleNodeDictTestCase(unittest.TestCase):
output = openstack_utils.get_network_id(mock_shade_client,
'network_name')
- self.assertEqual(None, output)
+ self.assertIsNone(output)
class DeleteNeutronNetTestCase(unittest.TestCase):
subprocess.check_output = mock.Mock(return_value=0)
args = {"vnf": "vpe",
"test": "tc_baremetal_rfc2544_ipv4_1flow_1518B.yaml"}
- self.assertEqual(None, yardstick_ns_cli.run_test(args, test_path))
+ self.assertIsNone(yardstick_ns_cli.run_test(args, test_path))
os.chdir(cur_dir)
args = {"vnf": "vpe1"}
- self.assertEqual(None, yardstick_ns_cli.run_test(args, test_path))
+ self.assertIsNone(yardstick_ns_cli.run_test(args, test_path))
os.chdir(cur_dir)
args = {"vnf": "vpe",
"test": "tc_baremetal_rfc2544_ipv4_1flow_1518B.yaml."}
- self.assertEqual(None, yardstick_ns_cli.run_test(args, test_path))
+ self.assertIsNone(yardstick_ns_cli.run_test(args, test_path))
os.chdir(cur_dir)
args = []
- self.assertEqual(None, yardstick_ns_cli.run_test(args, test_path))
+ self.assertIsNone(yardstick_ns_cli.run_test(args, test_path))
os.chdir(cur_dir)
def test_terminate_if_less_options(self):