'external-interface':
[{'virtual-interface':
{'dst_mac': '00:00:00:00:00:04',
+ 'vld_id': 'uplink_0',
'vpci': '0000:05:00.0',
'local_ip': '152.16.100.19',
'type': 'PCI-PASSTHROUGH',
'name': 'xe0'},
{'virtual-interface':
{'dst_mac': '00:00:00:00:00:03',
+ 'vld_id': 'downlink_0',
'vpci': '0000:05:00.1',
'local_ip': '152.16.40.19',
'type': 'PCI-PASSTHROUGH',
def test___init__(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.resource_helper.data)
+ def test_update_gateways(self):
+ vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
+ links = {'uplink_0': {'ip': {}},
+ 'downlink_1': {'ip': {}}}
+
+ ixload_traffic_gen.update_gateways(links)
+
+ self.assertEqual("152.16.100.20", links["uplink_0"]["ip"]["gateway"])
+ self.assertEqual("0.0.0.0", links["downlink_1"]["ip"]["gateway"])
+
@mock.patch.object(ctx_base.Context, 'get_physical_node_from_server',
return_value='mock_node')
def test_collect_kpi(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
ixload_traffic_gen.scenario_helper.scenario_cfg = {
'nodes': {ixload_traffic_gen.name: "mock"}
}
def test_listen_traffic(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.listen_traffic({}))
@mock.patch.object(utils, 'find_relative_file')
@mock.patch.object(tg_ixload, 'shutil')
def test_instantiate(self, mock_shutil, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
scenario_cfg = {'tc': "nsb_test_case",
'ixia_profile': "ixload.cfg",
'task_path': "/path/to/task"}
def test_run_traffic(self, *args):
mock_traffic_profile = mock.Mock(autospec=tp_base.TrafficProfile)
mock_traffic_profile.get_traffic_definition.return_value = '64'
+ mock_traffic_profile.get_links_param.return_value = {
+ 'uplink_0': {'ip': {}}}
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
vnfd['mgmt-interface'].update({'tg-config': {}})
vnfd['mgmt-interface']['tg-config'].update({'ixchassis': '1.1.1.1'})
vnfd['mgmt-interface']['tg-config'].update({'py_bin_path': '/root'})
- sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
sut.connection = mock.Mock()
sut._traffic_runner = mock.Mock(return_value=0)
result = sut.run_traffic(mock_traffic_profile)
def test_run_traffic_csv(self, *args):
mock_traffic_profile = mock.Mock(autospec=tp_base.TrafficProfile)
mock_traffic_profile.get_traffic_definition.return_value = '64'
+ mock_traffic_profile.get_links_param.return_value = {
+ 'uplink_0': {'ip': {}}}
mock_traffic_profile.params = self.TRAFFIC_PROFILE
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
vnfd['mgmt-interface'].update({'tg-config': {}})
vnfd['mgmt-interface']['tg-config'].update({'ixchassis': '1.1.1.1'})
vnfd['mgmt-interface']['tg-config'].update({'py_bin_path': '/root'})
- sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ sut = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
sut.connection = mock.Mock()
sut._traffic_runner = mock.Mock(return_value=0)
subprocess.call(['touch', '/tmp/1.csv'])
def test_terminate(self):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
self.assertIsNone(ixload_traffic_gen.terminate())
def test_parse_csv_read(self):
'HTTP Transaction Rate': True,
}
http_reader = [kpi_data]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
result = ixload_traffic_gen.resource_helper.result
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)
for k_left, k_right in tg_ixload.IxLoadResourceHelper.KPI_LIST.items():
'HTTP Connection Rate': 4,
'HTTP Transaction Rate': 5,
}]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
init_value = ixload_traffic_gen.resource_helper.result
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)
self.assertDictEqual(ixload_traffic_gen.resource_helper.result,
'HTTP Concurrent Connections': 3,
'HTTP Transaction Rate': 5,
}]
- ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd)
+ ixload_traffic_gen = tg_ixload.IxLoadTrafficGen(NAME, vnfd, 'task_id')
+
with self.assertRaises(KeyError):
ixload_traffic_gen.resource_helper.parse_csv_read(http_reader)