+++ /dev/null
-#!/usr/bin/env python
-
-# Copyright 2017 Nokia
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# Unittest for yardstick.benchmark.scenarios.networking.vsperf.VsperfDPDK
-
-from __future__ import absolute_import
-try:
- from unittest import mock
-except ImportError:
- import mock
-import unittest
-
-from yardstick.benchmark.scenarios.networking import vsperf_dpdk
-
-
-@mock.patch('yardstick.benchmark.scenarios.networking.vsperf_dpdk.subprocess')
-class VsperfDPDKTestCase(unittest.TestCase):
-
- def setUp(self):
- self.ctx = {
- "host": {
- "ip": "10.229.47.137",
- "user": "ubuntu",
- "password": "ubuntu",
- },
- }
- self.args = {
- 'task_id': "1234-5678",
- 'options': {
- 'testname': 'pvp_tput',
- 'traffic_type': 'rfc2544_throughput',
- 'frame_size': '64',
- 'test_params': 'TRAFFICGEN_DURATION=30;',
- 'trafficgen_port1': 'ens4',
- 'trafficgen_port2': 'ens5',
- 'conf_file': 'vsperf-yardstick.conf',
- 'setup_script': 'setup_yardstick.sh',
- 'moongen_helper_file': '~/moongen.py',
- 'moongen_host_ip': '10.5.201.151',
- 'moongen_port1_mac': '8c:dc:d4:ae:7c:5c',
- 'moongen_port2_mac': '8c:dc:d4:ae:7c:5d',
- 'trafficgen_port1_nw': 'test2',
- 'trafficgen_port2_nw': 'test3',
- },
- 'sla': {
- 'metrics': 'throughput_rx_fps',
- 'throughput_rx_fps': 500000,
- 'action': 'monitor',
- }
- }
-
- self._mock_ssh = mock.patch(
- 'yardstick.benchmark.scenarios.networking.vsperf_dpdk.ssh')
- self.mock_ssh = self._mock_ssh.start()
-
- self.addCleanup(self._cleanup)
-
- def _cleanup(self):
- self._mock_ssh.stop()
-
- def test_vsperf_dpdk_setup(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- def test_vsperf_dpdk_teardown(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- p.teardown()
- self.assertFalse(p.setup_done)
-
- def test_vsperf_dpdk_is_dpdk_setup_no(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # is_dpdk_setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
-
- result = p._is_dpdk_setup()
- self.assertFalse(result)
-
- def test_vsperf_dpdk_is_dpdk_setup_yes(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # is_dpdk_setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
-
- result = p._is_dpdk_setup()
- self.assertTrue(result)
-
- @mock.patch('time.sleep')
- def test_vsperf_dpdk_dpdk_setup_first(self, _, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # is_dpdk_setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
-
- p.dpdk_setup()
- self.assertFalse(p._is_dpdk_setup())
- self.assertTrue(p.dpdk_setup_done)
-
- @mock.patch('time.sleep')
- def test_vsperf_dpdk_dpdk_setup_next(self, _, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- p.dpdk_setup()
- self.assertTrue(p._is_dpdk_setup())
- self.assertTrue(p.dpdk_setup_done)
-
- @mock.patch('time.sleep')
- def test_vsperf_dpdk_dpdk_setup_fail(self, _, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
- self.assertTrue(p.setup_done)
-
- self.assertRaises(RuntimeError, p.dpdk_setup)
-
- @mock.patch('time.sleep')
- def test_vsperf_dpdk_run_ok(self, _, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # run() specific mocks
- mock_subprocess.call().execute.return_value = None
- self.mock_ssh.SSH.from_node().execute.return_value = (
- 0, 'throughput_rx_fps\r\n14797660.000\r\n', '')
-
- result = {}
- p.run(result)
-
- self.assertEqual(result['throughput_rx_fps'], '14797660.000')
-
- def test_vsperf_dpdk_run_falied_vsperf_execution(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # run() specific mocks
- mock_subprocess.call().execute.return_value = None
- mock_subprocess.call().execute.return_value = None
- self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
-
- result = {}
- self.assertRaises(RuntimeError, p.run, result)
-
- def test_vsperf_dpdk_run_falied_csv_report(self, mock_subprocess):
- p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
-
- # setup() specific mocks
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- mock_subprocess.call().execute.return_value = None
-
- p.setup()
- self.assertIsNotNone(p.client)
- self.assertTrue(p.setup_done)
-
- # run() specific mocks
- mock_subprocess.call().execute.return_value = None
- mock_subprocess.call().execute.return_value = None
- self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
- self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
-
- result = {}
- self.assertRaises(RuntimeError, p.run, result)
-
-def main():
- unittest.main()
-
-
-if __name__ == '__main__':
- main()
import mock
from xml.etree import ElementTree
-
-from yardstick.benchmark.contexts.standalone.model import Libvirt
from yardstick.benchmark.contexts.standalone import model
from yardstick.network_services import utils
def _cleanup(self):
self._mock_write_xml.stop()
+ # TODO: Remove mocking of yardstick.ssh.SSH (here and elsewhere)
+ # In this case, we are mocking a param to be passed into other methods
+ # It can be a generic Mock() with return values set for the right methods
def test_check_if_vm_exists_and_delete(self):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
status = model.Libvirt.build_vm_xml(ssh_mock, {}, cfg_file, 'vm_0', 0)
self.assertEqual(status[0], result[0])
+ # TODO: Edit this test to test state instead of output
+ # update_interrupts_hugepages_perf does not return anything
def test_update_interrupts_hugepages_perf(self):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
# None, this check is trivial.
#status = Libvirt.update_interrupts_hugepages_perf(ssh_mock)
#self.assertIsNone(status)
- Libvirt.update_interrupts_hugepages_perf(ssh_mock)
+ model.Libvirt.update_interrupts_hugepages_perf(ssh_mock)
- @mock.patch("yardstick.benchmark.contexts.standalone.model.CpuSysCores")
+ @mock.patch.object(model, 'CpuSysCores')
@mock.patch.object(model.Libvirt, 'update_interrupts_hugepages_perf')
def test_pin_vcpu_for_perf(self, *args):
# NOTE(ralonsoh): test mocked methods/variables.
ssh_mock.execute = \
mock.Mock(return_value=(0, "a", ""))
ssh.return_value = ssh_mock
- status = Libvirt.pin_vcpu_for_perf(ssh_mock, 4)
+ status = model.Libvirt.pin_vcpu_for_perf(ssh_mock, 4)
self.assertIsNotNone(status)
class StandaloneContextHelperTestCase(unittest.TestCase):
NETWORKS = {
'mgmt': {'cidr': '152.16.100.10/24'},
'private_0': {
- 'phy_port': "0000:05:00.0",
- 'vpci': "0000:00:07.0",
- 'cidr': '152.16.100.10/24',
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.0",
+ 'vpci': "0000:00:07.0",
+ 'cidr': '152.16.100.10/24',
+ 'gateway_ip': '152.16.100.20'},
'public_0': {
- 'phy_port': "0000:05:00.1",
- 'vpci': "0000:00:08.0",
- 'cidr': '152.16.40.10/24',
- 'gateway_ip': '152.16.100.20'}
+ 'phy_port': "0000:05:00.1",
+ 'vpci': "0000:00:08.0",
+ 'cidr': '152.16.40.10/24',
+ 'gateway_ip': '152.16.100.20'}
}
def setUp(self):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
- mock.Mock(return_value=(1, pattern, ""))
+ mock.Mock(return_value=(1, pattern, ""))
ssh.return_value = ssh_mock
# NOTE(ralonsoh): this test doesn't cover function execution. This test
# should also check mocked function calls.
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
- mock.Mock(return_value=(1, "", ""))
+ mock.Mock(return_value=(1, "", ""))
ssh.return_value = ssh_mock
# NOTE(ralonsoh): this test doesn't cover function execution. This test
# should also check mocked function calls.
NETWORKS = {
'mgmt': {'cidr': '152.16.100.10/24'},
'private_0': {
- 'phy_port': "0000:05:00.0",
- 'vpci': "0000:00:07.0",
- 'driver': 'i40e',
- 'mac': '',
- 'cidr': '152.16.100.10/24',
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.0",
+ 'vpci': "0000:00:07.0",
+ 'driver': 'i40e',
+ 'mac': '',
+ 'cidr': '152.16.100.10/24',
+ 'gateway_ip': '152.16.100.20'},
'public_0': {
- 'phy_port': "0000:05:00.1",
- 'vpci': "0000:00:08.0",
- 'driver': 'i40e',
- 'mac': '',
- 'cidr': '152.16.40.10/24',
- 'gateway_ip': '152.16.100.20'}
+ 'phy_port': "0000:05:00.1",
+ 'vpci': "0000:00:08.0",
+ 'driver': 'i40e',
+ 'mac': '',
+ 'cidr': '152.16.40.10/24',
+ 'gateway_ip': '152.16.100.20'}
}
def setUp(self):
{}, self.NETWORKS, '1.1.1.1/24', 'vm_0', vnf, '00:00:00:00:00:01')
self.assertIsNotNone(status)
+
class OvsDeployTestCase(unittest.TestCase):
NETWORKS = {
'mgmt': {'cidr': '152.16.100.10/24'},
'private_0': {
- 'phy_port': "0000:05:00.0",
- 'vpci': "0000:00:07.0",
- 'driver': 'i40e',
- 'mac': '',
- 'cidr': '152.16.100.10/24',
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.0",
+ 'vpci': "0000:00:07.0",
+ 'driver': 'i40e',
+ 'mac': '',
+ 'cidr': '152.16.100.10/24',
+ 'gateway_ip': '152.16.100.20'},
'public_0': {
- 'phy_port': "0000:05:00.1",
- 'vpci': "0000:00:08.0",
- 'driver': 'i40e',
- 'mac': '',
- 'cidr': '152.16.40.10/24',
- 'gateway_ip': '152.16.100.20'}
+ 'phy_port': "0000:05:00.1",
+ 'vpci': "0000:00:08.0",
+ 'driver': 'i40e',
+ 'mac': '',
+ 'cidr': '152.16.40.10/24',
+ 'gateway_ip': '152.16.100.20'}
}
+
@mock.patch('yardstick.ssh.SSH')
def setUp(self, mock_ssh):
self.ovs_deploy = model.OvsDeploy(mock_ssh, '/tmp/dpdk-devbind.py', {})
# See the License for the specific language governing permissions and
# limitations under the License.
-# Unittest for yardstick.benchmark.contexts.standalone.standaloneovs
-
-from __future__ import absolute_import
import os
-import unittest
+
import mock
+import unittest
from yardstick.benchmark.contexts.standalone import ovs_dpdk
NETWORKS = {
'mgmt': {'cidr': '152.16.100.10/24'},
'private_0': {
- 'phy_port': "0000:05:00.0",
- 'vpci': "0000:00:07.0",
- 'cidr': '152.16.100.10/24',
- 'interface': 'if0',
- 'mac': "00:00:00:00:00:01",
- 'vf_pci': {'vf_pci': 0},
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.0",
+ 'vpci': "0000:00:07.0",
+ 'cidr': '152.16.100.10/24',
+ 'interface': 'if0',
+ 'mac': "00:00:00:00:00:01",
+ 'vf_pci': {'vf_pci': 0},
+ 'gateway_ip': '152.16.100.20'},
'public_0': {
- 'phy_port': "0000:05:00.1",
- 'vpci': "0000:00:08.0",
- 'cidr': '152.16.40.10/24',
- 'interface': 'if0',
- 'vf_pci': {'vf_pci': 0},
- 'mac': "00:00:00:00:00:01",
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.1",
+ 'vpci': "0000:00:08.0",
+ 'cidr': '152.16.40.10/24',
+ 'interface': 'if0',
+ 'vf_pci': {'vf_pci': 0},
+ 'mac': "00:00:00:00:00:01",
+ 'gateway_ip': '152.16.100.20'},
}
def setUp(self):
self.ovs_dpdk = ovs_dpdk.OvsDpdkContext()
- @mock.patch('yardstick.benchmark.contexts.standalone.model.StandaloneContextHelper')
@mock.patch('yardstick.benchmark.contexts.standalone.model.Server')
+ @mock.patch('yardstick.benchmark.contexts.standalone.model.StandaloneContextHelper')
def test___init__(self, mock_helper, mock_server):
self.ovs_dpdk.helper = mock_helper
self.ovs_dpdk.vnf_node = mock_server
self.assertTrue(self.ovs_dpdk.first_run)
def test_init(self):
- self.ovs_dpdk.helper.parse_pod_file = mock.Mock(return_value=[{}, {}, {}])
+ self.ovs_dpdk.helper.parse_pod_file = mock.Mock(
+ return_value=[{}, {}, {}])
self.assertIsNone(self.ovs_dpdk.init(self.ATTRS))
def test_setup_ovs(self):
self.ovs_dpdk.wait_for_vswitchd = 0
self.assertIsNone(self.ovs_dpdk.setup_ovs_bridge_add_flows())
- def test_cleanup_ovs_dpdk_env(self):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "a", ""))
- ssh.return_value = ssh_mock
- self.ovs_dpdk.connection = ssh_mock
- self.ovs_dpdk.networks = self.NETWORKS
- self.ovs_dpdk.ovs_properties = {
- 'version': {'ovs': '2.7.0'}
- }
- self.ovs_dpdk.wait_for_vswitchd = 0
- self.assertIsNone(self.ovs_dpdk.cleanup_ovs_dpdk_env())
+ @mock.patch("yardstick.ssh.SSH")
+ def test_cleanup_ovs_dpdk_env(self, mock_ssh):
+ mock_ssh.execute.return_value = 0, "a", ""
+ self.ovs_dpdk.connection = mock_ssh
+ self.ovs_dpdk.networks = self.NETWORKS
+ self.ovs_dpdk.ovs_properties = {
+ 'version': {'ovs': '2.7.0'}
+ }
+ self.ovs_dpdk.wait_for_vswitchd = 0
+ self.assertIsNone(self.ovs_dpdk.cleanup_ovs_dpdk_env())
@mock.patch('yardstick.benchmark.contexts.standalone.model.OvsDeploy')
def test_check_ovs_dpdk_env(self, mock_ovs):
self.ovs_dpdk.connection = ssh_mock
self.ovs_dpdk.networks = self.NETWORKS
self.ovs_dpdk.ovs_properties = {
- 'version': {'ovs': '2.7.0', 'dpdk': '16.11.1'}
+ 'version': {'ovs': '2.7.0', 'dpdk': '16.11.1'}
}
self.ovs_dpdk.wait_for_vswitchd = 0
self.ovs_dpdk.cleanup_ovs_dpdk_env = mock.Mock()
self.ovs_dpdk.wait_for_vswitchd = 0
self.cleanup_ovs_dpdk_env = mock.Mock()
mock_ovs.deploy = mock.Mock()
+ # NOTE(elfoley): Check for a specific Exception
self.assertRaises(Exception, self.ovs_dpdk.check_ovs_dpdk_env)
@mock.patch('yardstick.ssh.SSH')
- def test_deploy(self, ssh_mock):
- ssh_mock.execute.return_value = (0, "a", "")
+ def test_deploy(self, mock_ssh):
+ mock_ssh.execute.return_value = 0, "a", ""
self.ovs_dpdk.vm_deploy = False
self.assertIsNone(self.ovs_dpdk.deploy())
@mock.patch('yardstick.benchmark.contexts.standalone.model.Libvirt')
@mock.patch('yardstick.ssh.SSH')
- def test_undeploy(self, ssh_mock, _):
- ssh_mock.execute.return_value = (0, "a", "")
+ def test_undeploy(self, mock_ssh, *args):
+ mock_ssh.execute.return_value = 0, "a", ""
self.ovs_dpdk.vm_deploy = False
self.assertIsNone(self.ovs_dpdk.undeploy())
self.ovs_dpdk.vm_deploy = True
+ self.ovs_dpdk.connection = mock_ssh
self.ovs_dpdk.vm_names = ['vm_0', 'vm_1']
- self.ovs_dpdk.connection = ssh_mock
self.ovs_dpdk.drivers = ['vm_0', 'vm_1']
self.ovs_dpdk.cleanup_ovs_dpdk_env = mock.Mock()
self.ovs_dpdk.networks = self.NETWORKS
'file': self._get_file_abspath(self.NODES_ovs_dpdk_SAMPLE)
}
- self.ovs_dpdk.helper.parse_pod_file = mock.Mock(return_value=[{}, {}, {}])
+ self.ovs_dpdk.helper.parse_pod_file = mock.Mock(
+ return_value=[{}, {}, {}])
self.ovs_dpdk.init(attrs)
attr_name = 'bar.foo'
self.assertEqual(result['user'], 'root')
self.assertEqual(result['key_filename'], '/root/.yardstick_key')
+ # TODO(elfoley): Split this test for networks that exist and networks that
+ # don't
def test__get_network(self):
network1 = {
'name': 'net_1',
'b': network2,
}
+ # Tests for networks that do not exist
attr_name = {}
self.assertIsNone(self.ovs_dpdk._get_network(attr_name))
self.assertIsNone(self.ovs_dpdk._get_network(None))
+ # TODO(elfoley): Split this test
attr_name = 'vld777'
self.assertIsNone(self.ovs_dpdk._get_network(attr_name))
+ # Tests for networks that exist
attr_name = {'vld_id': 'vld999'}
expected = {
"name": 'net_2',
self.ovs_dpdk.get_vf_datas = mock.Mock(return_value="")
self.assertIsNone(self.ovs_dpdk.configure_nics_for_ovs_dpdk())
- @mock.patch('yardstick.benchmark.contexts.standalone.model.Libvirt.add_ovs_interface')
- def test__enable_interfaces(self, _):
+ @mock.patch('yardstick.benchmark.contexts.standalone.ovs_dpdk.Libvirt')
+ def test__enable_interfaces(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
self.ovs_dpdk.drivers = []
self.ovs_dpdk.networks = self.NETWORKS
self.ovs_dpdk.get_vf_datas = mock.Mock(return_value="")
- self.assertIsNone(self.ovs_dpdk._enable_interfaces(0, ["private_0"], 'test'))
+ self.assertIsNone(self.ovs_dpdk._enable_interfaces(
+ 0, ["private_0"], 'test'))
- @mock.patch('yardstick.benchmark.contexts.standalone.ovs_dpdk.Libvirt')
@mock.patch('yardstick.benchmark.contexts.standalone.model.Server')
- def test_setup_ovs_dpdk_context(self, _, mock_libvirt):
+ @mock.patch('yardstick.benchmark.contexts.standalone.ovs_dpdk.Libvirt')
+ def test_setup_ovs_dpdk_context(self, mock_libvirt, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
self.ovs_dpdk.host_mgmt = {}
self.ovs_dpdk.flavor = {}
self.ovs_dpdk.configure_nics_for_ovs_dpdk = mock.Mock(return_value="")
- mock_libvirt.check_if_vm_exists_and_delete = mock.Mock(return_value="")
- mock_libvirt.build_vm_xml = mock.Mock(return_value=[6, "00:00:00:00:00:01"])
+ mock_libvirt.build_vm_xml.return_value = [6, "00:00:00:00:00:01"]
self.ovs_dpdk._enable_interfaces = mock.Mock(return_value="")
- mock_libvirt.virsh_create_vm = mock.Mock(return_value="")
- mock_libvirt.pin_vcpu_for_perf = mock.Mock(return_value="")
- self.ovs_dpdk.vnf_node.generate_vnf_instance = mock.Mock(return_value={})
+ mock_libvirt.virsh_create_vm.return_value = ""
+ mock_libvirt.pin_vcpu_for_perf.return_value = ""
+ self.ovs_dpdk.vnf_node.generate_vnf_instance = mock.Mock(
+ return_value={})
+
self.assertIsNotNone(self.ovs_dpdk.setup_ovs_dpdk_context())
# See the License for the specific language governing permissions and
# limitations under the License.
-# Unittest for yardstick.benchmark.contexts.standalone.standalonesriov
-
-from __future__ import absolute_import
import os
-import unittest
+
import mock
+import unittest
from yardstick import ssh
from yardstick.benchmark.contexts.standalone import sriov
NETWORKS = {
'mgmt': {'cidr': '152.16.100.10/24'},
'private_0': {
- 'phy_port': "0000:05:00.0",
- 'vpci': "0000:00:07.0",
- 'cidr': '152.16.100.10/24',
- 'interface': 'if0',
- 'mac': "00:00:00:00:00:01",
- 'vf_pci': {'vf_pci': 0},
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.0",
+ 'vpci': "0000:00:07.0",
+ 'cidr': '152.16.100.10/24',
+ 'interface': 'if0',
+ 'mac': "00:00:00:00:00:01",
+ 'vf_pci': {'vf_pci': 0},
+ 'gateway_ip': '152.16.100.20'},
'public_0': {
- 'phy_port': "0000:05:00.1",
- 'vpci': "0000:00:08.0",
- 'cidr': '152.16.40.10/24',
- 'interface': 'if0',
- 'vf_pci': {'vf_pci': 0},
- 'mac': "00:00:00:00:00:01",
- 'gateway_ip': '152.16.100.20'},
+ 'phy_port': "0000:05:00.1",
+ 'vpci': "0000:00:08.0",
+ 'cidr': '152.16.40.10/24',
+ 'interface': 'if0',
+ 'vf_pci': {'vf_pci': 0},
+ 'mac': "00:00:00:00:00:01",
+ 'gateway_ip': '152.16.100.20'},
}
def setUp(self):
self.sriov = sriov.SriovContext()
- @mock.patch('yardstick.benchmark.contexts.standalone.model.StandaloneContextHelper')
@mock.patch('yardstick.benchmark.contexts.standalone.sriov.Libvirt')
+ @mock.patch('yardstick.benchmark.contexts.standalone.model.StandaloneContextHelper')
@mock.patch('yardstick.benchmark.contexts.standalone.model.Server')
- def test___init__(self, mock_helper, mock_libvirt, mock_server):
- # pylint: disable=unused-argument
+ def test___init__(self, mock_helper, mock_server, *args):
# NOTE(ralonsoh): this test doesn't cover function execution.
- # The pylint exception should be removed.
self.sriov.helper = mock_helper
self.sriov.vnf_node = mock_server
self.assertIsNone(self.sriov.file_path)
self.assertIsNone(self.sriov.init(self.ATTRS))
@mock.patch.object(ssh, 'SSH', return_value=(0, "a", ""))
- def test_deploy(self, mock_ssh):
- # pylint: disable=unused-argument
+ def test_deploy(self, *args):
# NOTE(ralonsoh): this test doesn't cover function execution.
- # The pylint exception should be removed.
self.sriov.vm_deploy = False
self.assertIsNone(self.sriov.deploy())
self.sriov.wait_for_vnfs_to_start = mock.Mock(return_value={})
self.assertIsNone(self.sriov.deploy())
- @mock.patch.object(ssh, 'SSH', return_value=(0, "a", ""))
@mock.patch('yardstick.benchmark.contexts.standalone.sriov.Libvirt')
- def test_undeploy(self, mock_libvirt, mock_ssh):
- # pylint: disable=unused-argument
- # NOTE(ralonsoh): the pylint exception should be removed.
+ @mock.patch.object(ssh, 'SSH', return_value=(0, "a", ""))
+ def test_undeploy(self, mock_ssh, *args):
self.sriov.vm_deploy = False
self.assertIsNone(self.sriov.undeploy())
self.assertEqual(result['user'], 'root')
self.assertEqual(result['key_filename'], '/root/.yardstick_key')
+ # TODO(elfoley): Split this test
+ # There are at least two sets of inputs/outputs
def test__get_network(self):
network1 = {
'name': 'net_1',
self.sriov._get_vf_data = mock.Mock(return_value="")
self.assertIsNone(self.sriov.configure_nics_for_sriov())
- @mock.patch.object(ssh, 'SSH', return_value=(0, "a", ""))
@mock.patch('yardstick.benchmark.contexts.standalone.sriov.Libvirt')
- def test__enable_interfaces(self, mock_libvirt, mock_ssh):
- # pylint: disable=unused-argument
- # NOTE(ralonsoh): the pylint exception should be removed.
+ @mock.patch.object(ssh, 'SSH')
+ def test__enable_interfaces(self, mock_ssh, *args):
+ mock_ssh.return_value = 0, "a", ""
+
self.sriov.vm_deploy = True
self.sriov.connection = mock_ssh
self.sriov.vm_names = ['vm_0', 'vm_1']
self.sriov.drivers = []
self.sriov.networks = self.NETWORKS
- self.sriov._get_vf_data = mock.Mock(return_value="")
- self.assertIsNone(self.sriov._enable_interfaces(0, 0, ["private_0"], 'test'))
+ self.sriov.get_vf_data = mock.Mock(return_value="")
+ self.assertIsNone(self.sriov._enable_interfaces(
+ 0, 0, ["private_0"], 'test'))
@mock.patch('yardstick.benchmark.contexts.standalone.model.Server')
@mock.patch('yardstick.benchmark.contexts.standalone.sriov.Libvirt')
- def test_setup_sriov_context(self, mock_libvirt, mock_server):
- # pylint: disable=unused-argument
- # NOTE(ralonsoh): the pylint exception should be removed.
+ def test_setup_sriov_context(self, mock_libvirt, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
self.sriov.host_mgmt = {}
self.sriov.flavor = {}
self.sriov.configure_nics_for_sriov = mock.Mock(return_value="")
- mock_libvirt.build_vm_xml = mock.Mock(return_value=[6, "00:00:00:00:00:01"])
+ mock_libvirt.build_vm_xml = mock.Mock(
+ return_value=[6, "00:00:00:00:00:01"])
self.sriov._enable_interfaces = mock.Mock(return_value="")
self.sriov.vnf_node.generate_vnf_instance = mock.Mock(return_value={})
self.assertIsNotNone(self.sriov.setup_sriov_context())
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.contexts.heat
-
-from __future__ import absolute_import
-
+from collections import OrderedDict
+from itertools import count
import logging
import os
-import unittest
import uuid
-from collections import OrderedDict
import mock
+import unittest
-from itertools import count
from yardstick.benchmark.contexts import heat
from yardstick.benchmark.contexts import model
+
LOG = logging.getLogger(__name__)
pgs['pgrp1']['policy'])
mock_sg.assert_called_with('servergroup1', self.test_context,
sgs['servergroup1']['policy'])
- self.assertTrue(len(self.test_context.placement_groups) == 1)
- self.assertTrue(len(self.test_context.server_groups) == 1)
+ self.assertEqual(len(self.test_context.placement_groups), 1)
+ self.assertEqual(len(self.test_context.server_groups), 1)
mock_network.assert_called_with(
'bar', self.test_context, networks['bar'])
- self.assertTrue(len(self.test_context.networks) == 1)
+ self.assertEqual(len(self.test_context.networks), 1)
mock_server.assert_called_with('baz', self.test_context,
servers['baz'])
- self.assertTrue(len(self.test_context.servers) == 1)
+ self.assertEqual(len(self.test_context.servers), 1)
if os.path.exists(self.test_context.key_filename):
try:
"2f2e4997-0a8e-4eb7-9fa4-f3f8fbbc393b")
mock_template.add_security_group.assert_called_with("foo-secgroup")
# mock_template.add_network.assert_called_with("bar-fool-network", 'physnet1', None)
- mock_template.add_router.assert_called_with("bar-fool-network-router",
- netattrs["external_network"],
- "bar-fool-network-subnet")
- mock_template.add_router_interface.assert_called_with("bar-fool-network-router-if0",
- "bar-fool-network-router",
- "bar-fool-network-subnet")
+ mock_template.add_router.assert_called_with(
+ "bar-fool-network-router",
+ netattrs["external_network"],
+ "bar-fool-network-subnet")
+ mock_template.add_router_interface.assert_called_with(
+ "bar-fool-network-router-if0",
+ "bar-fool-network-router",
+ "bar-fool-network-subnet")
@mock.patch('yardstick.benchmark.contexts.heat.HeatTemplate')
- def test_attrs_get(self, mock_template):
+ def test_attrs_get(self, *args):
image, flavor, user = expected_tuple = 'foo1', 'foo2', 'foo3'
self.assertNotEqual(self.test_context.image, image)
self.assertNotEqual(self.test_context.flavor, flavor)
self.assertEqual(attr_tuple, expected_tuple)
@mock.patch('yardstick.benchmark.contexts.heat.HeatTemplate')
- def test_attrs_set_negative(self, mock_template):
+ def test_attrs_set_negative(self, *args):
with self.assertRaises(AttributeError):
self.test_context.image = 'foo'
@mock.patch('yardstick.benchmark.contexts.heat.HeatTemplate')
@mock.patch('yardstick.benchmark.contexts.heat.os')
- def test_undeploy_key_filename(self, mock_template, mock_os):
+ def test_undeploy_key_filename(self, mock_os, mock_template):
self.test_context.stack = mock_template
mock_os.path.exists.return_value = True
self.assertIsNone(self.test_context.undeploy())
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_found_dict(self, mock_pkg_resources):
+ def test__get_server_found_dict(self, *args):
"""
Use HeatContext._get_server to get a server that matches
based on a dictionary input.
self.assertEqual(result['private_ip'], '10.0.0.1')
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_found_dict_no_attrs(self, mock_pkg_resources):
+ def test__get_server_found_dict_no_attrs(self, *args):
"""
Use HeatContext._get_server to get a server that matches
based on a dictionary input.
self.assertNotIn('ip', result)
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_found_not_dict(self, mock_pkg_resources):
+ def test__get_server_found_not_dict(self, *args):
"""
Use HeatContext._get_server to get a server that matches
based on a non-dictionary input
self.assertNotIn('public_ip', result)
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_none_found_not_dict(self, mock_pkg_resources):
+ def test__get_server_none_found_not_dict(self, *args):
"""
Use HeatContext._get_server to not get a server due to
None value associated with the match to a non-dictionary
self.assertIsNone(result)
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_not_found_dict(self, mock_pkg_resources):
+ def test__get_server_not_found_dict(self, *args):
"""
Use HeatContext._get_server to not get a server for lack
of a match to a dictionary input
self.assertIsNone(result)
@mock.patch("yardstick.benchmark.contexts.heat.pkg_resources")
- def test__get_server_not_found_not_dict(self, mock_pkg_resources):
+ def test__get_server_not_found_not_dict(self, *args):
"""
Use HeatContext._get_server to not get a server for lack
of a match to a non-dictionary input
result = self.test_context._get_server(attr_name)
self.assertIsNone(result)
+ # TODO: Split this into more granular tests
def test__get_network(self):
network1 = mock.MagicMock()
network1.name = 'net_1'
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.contexts.kubernetes
-
-from __future__ import absolute_import
-import unittest
import mock
+import unittest
from yardstick.benchmark.contexts.base import Context
-from yardstick.benchmark.contexts.kubernetes import KubernetesContext
+from yardstick.benchmark.contexts import kubernetes
context_cfg = {
class KubernetesTestCase(unittest.TestCase):
+ def setUp(self):
+ self.k8s_context = kubernetes.KubernetesContext()
+ self.k8s_context.init(context_cfg)
+
def tearDown(self):
# clear kubernetes contexts from global list so we don't break other tests
Context.list = []
- @mock.patch('{}.KubernetesContext._delete_services'.format(prefix))
- @mock.patch('{}.KubernetesContext._delete_ssh_key'.format(prefix))
- @mock.patch('{}.KubernetesContext._delete_rcs'.format(prefix))
- @mock.patch('{}.KubernetesContext._delete_pods'.format(prefix))
+ @mock.patch.object(kubernetes.KubernetesContext, '_delete_services')
+ @mock.patch.object(kubernetes.KubernetesContext, '_delete_ssh_key')
+ @mock.patch.object(kubernetes.KubernetesContext, '_delete_rcs')
+ @mock.patch.object(kubernetes.KubernetesContext, '_delete_pods')
def test_undeploy(self,
mock_delete_pods,
mock_delete_rcs,
mock_delete_ssh,
mock_delete_services):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context.undeploy()
+ self.k8s_context.undeploy()
self.assertTrue(mock_delete_ssh.called)
self.assertTrue(mock_delete_rcs.called)
self.assertTrue(mock_delete_pods.called)
self.assertTrue(mock_delete_services.called)
- @mock.patch('{}.KubernetesContext._create_services'.format(prefix))
- @mock.patch('{}.KubernetesContext._wait_until_running'.format(prefix))
- @mock.patch('{}.KubernetesTemplate.get_rc_pods'.format(prefix))
- @mock.patch('{}.KubernetesContext._create_rcs'.format(prefix))
- @mock.patch('{}.KubernetesContext._set_ssh_key'.format(prefix))
+ @mock.patch.object(kubernetes.KubernetesContext, '_create_services')
+ @mock.patch.object(kubernetes.KubernetesContext, '_wait_until_running')
+ @mock.patch.object(kubernetes.KubernetesTemplate, 'get_rc_pods')
+ @mock.patch.object(kubernetes.KubernetesContext, '_create_rcs')
+ @mock.patch.object(kubernetes.KubernetesContext, '_set_ssh_key')
def test_deploy(self,
mock_set_ssh_key,
mock_create_rcs,
mock_wait_until_running,
mock_create_services):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
with mock.patch("yardstick.benchmark.contexts.kubernetes.time"):
- k8s_context.deploy()
+ self.k8s_context.deploy()
self.assertTrue(mock_set_ssh_key.called)
self.assertTrue(mock_create_rcs.called)
self.assertTrue(mock_create_services.called)
self.assertTrue(mock_get_rc_pods.called)
self.assertTrue(mock_wait_until_running.called)
- @mock.patch('{}.paramiko'.format(prefix), **{"resource_filename.return_value": ""})
- @mock.patch('{}.pkg_resources'.format(prefix), **{"resource_filename.return_value": ""})
- @mock.patch('{}.utils'.format(prefix))
- @mock.patch('{}.open'.format(prefix), create=True)
- @mock.patch('{}.k8s_utils.delete_config_map'.format(prefix))
- @mock.patch('{}.k8s_utils.create_config_map'.format(prefix))
- def test_ssh_key(self, mock_create, mock_delete, mock_open, mock_utils, mock_resources,
- mock_paramiko):
-
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._set_ssh_key()
- k8s_context._delete_ssh_key()
+ @mock.patch.object(kubernetes, 'paramiko', **{"resource_filename.return_value": ""})
+ @mock.patch.object(kubernetes, 'pkg_resources', **{"resource_filename.return_value": ""})
+ @mock.patch.object(kubernetes, 'utils')
+ @mock.patch.object(kubernetes, 'open', create=True)
+ @mock.patch.object(kubernetes.k8s_utils, 'delete_config_map')
+ @mock.patch.object(kubernetes.k8s_utils, 'create_config_map')
+ def test_ssh_key(self, mock_create, mock_delete, *args):
+ self.k8s_context._set_ssh_key()
+ self.k8s_context._delete_ssh_key()
+
self.assertTrue(mock_create.called)
self.assertTrue(mock_delete.called)
- @mock.patch('{}.k8s_utils.read_pod_status'.format(prefix))
+ @mock.patch.object(kubernetes.k8s_utils, 'read_pod_status')
def test_wait_until_running(self, mock_read_pod_status):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context.template.pods = ['server']
+ self.k8s_context.template.pods = ['server']
mock_read_pod_status.return_value = 'Running'
- k8s_context._wait_until_running()
+ self.k8s_context._wait_until_running()
- @mock.patch('{}.k8s_utils.get_pod_by_name'.format(prefix))
- @mock.patch('{}.KubernetesContext._get_node_ip'.format(prefix))
- @mock.patch('{}.k8s_utils.get_service_by_name'.format(prefix))
+ @mock.patch.object(kubernetes.k8s_utils, 'get_pod_by_name')
+ @mock.patch.object(kubernetes.KubernetesContext, '_get_node_ip')
+ @mock.patch.object(kubernetes.k8s_utils, 'get_service_by_name')
def test_get_server(self,
mock_get_service_by_name,
mock_get_node_ip,
def __init__(self):
self.status = Status()
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
-
mock_get_service_by_name.return_value = Services()
mock_get_pod_by_name.return_value = Pod()
mock_get_node_ip.return_value = '172.16.10.131'
- server = k8s_context._get_server('server')
- self.assertIsNotNone(server)
+ self.assertIsNotNone(self.k8s_context._get_server('server'))
- @mock.patch('{}.KubernetesContext._create_rc'.format(prefix))
+ @mock.patch.object(kubernetes.KubernetesContext, '_create_rc')
def test_create_rcs(self, mock_create_rc):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._create_rcs()
+ self.k8s_context._create_rcs()
self.assertTrue(mock_create_rc.called)
- @mock.patch('{}.k8s_utils.create_replication_controller'.format(prefix))
+ @mock.patch.object(kubernetes.k8s_utils, 'create_replication_controller')
def test_create_rc(self, mock_create_replication_controller):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._create_rc({})
+ self.k8s_context._create_rc({})
self.assertTrue(mock_create_replication_controller.called)
- @mock.patch('{}.KubernetesContext._delete_rc'.format(prefix))
+ @mock.patch.object(kubernetes.KubernetesContext, '_delete_rc')
def test_delete_rcs(self, mock_delete_rc):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._delete_rcs()
+ self.k8s_context._delete_rcs()
self.assertTrue(mock_delete_rc.called)
- @mock.patch('{}.k8s_utils.delete_replication_controller'.format(prefix))
+ @mock.patch.object(kubernetes.k8s_utils, 'delete_replication_controller')
def test_delete_rc(self, mock_delete_replication_controller):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._delete_rc({})
+ self.k8s_context._delete_rc({})
self.assertTrue(mock_delete_replication_controller.called)
- @mock.patch('{}.k8s_utils.get_node_list'.format(prefix))
+ @mock.patch.object(kubernetes.k8s_utils, 'get_node_list')
def test_get_node_ip(self, mock_get_node_list):
-
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._get_node_ip()
+ self.k8s_context._get_node_ip()
self.assertTrue(mock_get_node_list.called)
@mock.patch('yardstick.orchestrator.kubernetes.ServiceObject.create')
def test_create_services(self, mock_create):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._create_services()
+ self.k8s_context._create_services()
self.assertTrue(mock_create.called)
@mock.patch('yardstick.orchestrator.kubernetes.ServiceObject.delete')
def test_delete_services(self, mock_delete):
- k8s_context = KubernetesContext()
- k8s_context.init(context_cfg)
- k8s_context._delete_services()
+ self.k8s_context._delete_services()
self.assertTrue(mock_delete.called)
def test_find_external_network(self):
mock_network = mock.Mock()
- mock_network.router = mock.Mock()
+ mock_network.router = mock.Mock() #pylint ignore assignment-from-none
mock_network.router.external_gateway_info = 'ext_net'
model.Network.list = [mock_network]
from yardstick.benchmark.contexts import node
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
class NodeContextTestCase(unittest.TestCase):
PREFIX = 'yardstick.benchmark.contexts.node'
result = self.test_context.get_context_from_server('my.vnf1')
self.assertIs(result, self.test_context)
+ # TODO: Split this into more granular tests
def test__get_network(self):
network1 = {
'name': 'net_1',
# self.input_file = ('plugin/sample_config.yaml',)
self.input_file = [
os.path.join(os.path.abspath(
- dirname(dirname(dirname(dirname(dirname(__file__)))))),
+ dirname(dirname(dirname(dirname(dirname(dirname(__file__))))))),
'plugin/sample_config.yaml')]
self.assertEqual(1, mock_tasks.call_count)
self.assertEqual(1, mock_keys.call_count)
+ # pylint: disable=deprecated-method
def test_invalid_yaml_name(self):
self.assertRaisesRegexp(ValueError, "yaml*", self.rep._validate,
'F@KE_NAME', FAKE_TASK_ID)
+ # pylint: disable=deprecated-method
def test_invalid_task_id(self):
self.assertRaisesRegexp(ValueError, "task*", self.rep._validate,
FAKE_YAML_NAME, DUMMY_TASK_ID)
mock_query.return_value = []
self.rep.yaml_name = FAKE_YAML_NAME
self.rep.task_id = FAKE_TASK_ID
+ # pylint: disable=deprecated-method
self.assertRaisesRegexp(KeyError, "Task ID", self.rep._get_fieldkeys)
self.assertRaisesRegexp(KeyError, "Task ID", self.rep._get_tasks)
+ # pylint: enable=deprecated-method
from yardstick.common import constants as consts
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
class TaskTestCase(unittest.TestCase):
@mock.patch('yardstick.benchmark.core.task.Context')
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-from __future__ import print_function
-from __future__ import absolute_import
-
-import unittest
import time
-from mock import mock
+import mock
+import unittest
from yardstick.benchmark.runners import base
-from yardstick.benchmark.runners.iteration import IterationRunner
+from yardstick.benchmark.runners import iteration
class ActionTestCase(unittest.TestCase):
class RunnerTestCase(unittest.TestCase):
+ def setUp(self):
+ self.runner = iteration.IterationRunner({})
+
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
- def test_get_output(self, mock_process):
- runner = IterationRunner({})
- runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
- runner.output_queue.put({'criteria': 'PASS'})
+ def test_get_output(self, *args):
+ self.runner.output_queue.put({'case': 'opnfv_yardstick_tc002'})
+ self.runner.output_queue.put({'criteria': 'PASS'})
idle_result = {
'case': 'opnfv_yardstick_tc002',
'criteria': 'PASS'
}
- for retries in range(1000):
+ for _ in range(1000):
time.sleep(0.01)
- if not runner.output_queue.empty():
+ if not self.runner.output_queue.empty():
break
- actual_result = runner.get_output()
+ actual_result = self.runner.get_output()
self.assertEqual(idle_result, actual_result)
@mock.patch("yardstick.benchmark.runners.iteration.multiprocessing")
- def test_get_result(self, mock_process):
- runner = IterationRunner({})
- runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
- runner.result_queue.put({'criteria': 'PASS'})
+ def test_get_result(self, *args):
+ self.runner.result_queue.put({'case': 'opnfv_yardstick_tc002'})
+ self.runner.result_queue.put({'criteria': 'PASS'})
idle_result = [
{'case': 'opnfv_yardstick_tc002'},
{'criteria': 'PASS'}
]
- for retries in range(1000):
+ for _ in range(1000):
time.sleep(0.01)
- if not runner.result_queue.empty():
+ if not self.runner.result_queue.empty():
break
- actual_result = runner.get_result()
+ actual_result = self.runner.get_result()
self.assertEqual(idle_result, actual_result)
def test__run_benchmark(self):
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
-from __future__ import absolute_import
+import time
-import unittest
import mock
+import unittest
-from tests.unit import STL_MOCKS
+from yardstick.tests.unit import STL_MOCKS
STLClient = mock.MagicMock()
stl_patch = mock.patch.dict("sys.modules", STL_MOCKS)
benchmark = cls()
method = getattr(benchmark, 'my_method')
- helper = SearchRunnerHelper(cls, 'my_method', scenario_cfg, {}, aborted)
+ helper = SearchRunnerHelper(
+ cls, 'my_method', scenario_cfg, {}, aborted)
with helper.get_benchmark_instance():
helper()
'runner': {},
}
- helper = SearchRunnerHelper(cls, 'my_method', scenario_cfg, {}, aborted)
+ helper = SearchRunnerHelper(
+ cls, 'my_method', scenario_cfg, {}, aborted)
with self.assertRaises(RuntimeError):
helper()
- @mock.patch('yardstick.benchmark.runners.search.time')
- def test_is_not_done(self, mock_time):
+ @mock.patch.object(time, 'sleep')
+ @mock.patch.object(time, 'time')
+ def test_is_not_done(self, mock_time, *args):
cls = mock.MagicMock()
aborted = mock.MagicMock()
scenario_cfg = {
'runner': {},
}
- mock_time.time.side_effect = range(1000)
+ mock_time.side_effect = range(1000)
- helper = SearchRunnerHelper(cls, 'my_method', scenario_cfg, {}, aborted)
+ helper = SearchRunnerHelper(
+ cls, 'my_method', scenario_cfg, {}, aborted)
index = -1
for index in helper.is_not_done():
self.assertGreaterEqual(index, 10)
- @mock.patch('yardstick.benchmark.runners.search.time')
- def test_is_not_done_immediate_stop(self, mock_time):
+ @mock.patch.object(time, 'sleep')
+ def test_is_not_done_immediate_stop(self, *args):
cls = mock.MagicMock()
aborted = mock.MagicMock()
scenario_cfg = {
},
}
- helper = SearchRunnerHelper(cls, 'my_method', scenario_cfg, {}, aborted)
+ helper = SearchRunnerHelper(
+ cls, 'my_method', scenario_cfg, {}, aborted)
index = -1
for index in helper.is_not_done():
self.assertEqual(index, -1)
+
class TestSearchRunner(unittest.TestCase):
def test__worker_run_once(self):
attacker_baremetal
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.attacker.attacker_baremetal.subprocess')
class ExecuteShellTestCase(unittest.TestCase):
def test__fun_execute_shell_command_successful(self, mock_subprocess):
cmd = "env"
mock_subprocess.check_output.return_value = (0, 'unittest')
- exitcode, output = attacker_baremetal._execute_shell_command(cmd)
+ exitcode, _ = attacker_baremetal._execute_shell_command(cmd)
self.assertEqual(exitcode, 0)
@mock.patch('yardstick.benchmark.scenarios.availability.attacker.attacker_baremetal.LOG')
def test__fun_execute_shell_command_fail_cmd_exception(self, mock_log, mock_subprocess):
cmd = "env"
mock_subprocess.check_output.side_effect = RuntimeError
- exitcode, output = attacker_baremetal._execute_shell_command(cmd)
+ exitcode, _ = attacker_baremetal._execute_shell_command(cmd)
self.assertEqual(exitcode, -1)
mock_log.error.assert_called_once()
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for
-# yardstick.benchmark.scenarios.availability.monitor.monitor_command
-
-from __future__ import absolute_import
import mock
import unittest
from yardstick.benchmark.scenarios.availability.monitor import basemonitor
-@mock.patch(
- 'yardstick.benchmark.scenarios.availability.monitor.basemonitor'
- '.BaseMonitor')
class MonitorMgrTestCase(unittest.TestCase):
def setUp(self):
for mo in self.monitor_list:
mo._result = {"outage_time": 10}
- def test__MonitorMgr_setup_successful(self, mock_monitor):
+ @mock.patch.object(basemonitor, 'BaseMonitor')
+ def test__MonitorMgr_setup_successful(self, *args):
instance = basemonitor.MonitorMgr({"nova-api": 10})
instance.init_monitors(self.monitor_configs, None)
instance.start_monitors()
instance.wait_monitors()
- ret = instance.verify_SLA()
+ # TODO(elfoley): Check the return value
+ ret = instance.verify_SLA() # pylint: disable=unused-variable
- def test_MonitorMgr_getitem(self, mock_monitor):
+ @mock.patch.object(basemonitor, 'BaseMonitor')
+ def test_MonitorMgr_getitem(self, *args):
monitorMgr = basemonitor.MonitorMgr({"nova-api": 10})
monitorMgr.init_monitors(self.monitor_configs, None)
- def test_store_result(self, mock_monitor):
+ @mock.patch.object(basemonitor, 'BaseMonitor')
+ def test_store_result(self, *args):
expect = {'process_neutron-server_outage_time': 10,
'openstack-router-list_outage_time': 10}
result = {}
ins.run()
ins.verify_SLA()
- @mock.patch(
- 'yardstick.benchmark.scenarios.availability.monitor.basemonitor'
- '.multiprocessing')
+ @mock.patch.object(basemonitor, 'multiprocessing')
def test__basemonitor_func_false(self, mock_multiprocess):
ins = self.MonitorSimple(self.monitor_cfg, None, {"nova-api": 10})
ins.setup()
ins.run()
ins.verify_SLA()
+ # TODO(elfoley): fix this test to not throw an error
def test__basemonitor_getmonitorcls_successfule(self):
cls = None
try:
cls = basemonitor.BaseMonitor.get_monitor_cls(self.monitor_cfg)
- except Exception:
+ except Exception: # pylint: disable=broad-except
pass
self.assertIsNone(cls)
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2016 Huan Li and others
# lihuansse@tongji.edu.cn
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for
-# yardstick.benchmark.scenarios.availability.operation.baseoperation
-
-from __future__ import absolute_import
import mock
import unittest
from yardstick.benchmark.scenarios.availability.operation import baseoperation
-@mock.patch(
- 'yardstick.benchmark.scenarios.availability.operation.baseoperation'
- '.BaseOperation')
class OperationMgrTestCase(unittest.TestCase):
def setUp(self):
self.operation_configs = []
self.operation_configs.append(config)
- def test_all_successful(self, mock_operation):
+ @mock.patch.object(baseoperation, 'BaseOperation')
+ def test_all_successful(self, *args):
mgr_ins = baseoperation.OperationMgr()
mgr_ins.init_operations(self.operation_configs, None)
- operation_ins = mgr_ins["service-status"]
+ _ = mgr_ins["service-status"]
mgr_ins.rollback()
- def test_getitem_fail(self, mock_operation):
+ @mock.patch.object(baseoperation, 'BaseOperation')
+ def test_getitem_fail(self, *args):
mgr_ins = baseoperation.OperationMgr()
mgr_ins.init_operations(self.operation_configs, None)
with self.assertRaises(KeyError):
- operation_ins = mgr_ins["operation-not-exist"]
+ _ = mgr_ins["operation-not-exist"]
class TestOperation(baseoperation.BaseOperation):
'operation_type': 'general-operation',
'key': 'service-status'
}
+ self.base_ins = baseoperation.BaseOperation(self.config, None)
def test_all_successful(self):
- base_ins = baseoperation.BaseOperation(self.config, None)
- base_ins.setup()
- base_ins.run()
- base_ins.rollback()
+ self.base_ins.setup()
+ self.base_ins.run()
+ self.base_ins.rollback()
def test_get_script_fullpath(self):
- base_ins = baseoperation.BaseOperation(self.config, None)
- base_ins.get_script_fullpath("ha_tools/test.bash")
+ self.base_ins.get_script_fullpath("ha_tools/test.bash")
+ # TODO(elfoley): Fix test to check on expected outputs
+ # pylint: disable=unused-variable
def test_get_operation_cls_successful(self):
- base_ins = baseoperation.BaseOperation(self.config, None)
- operation_ins = base_ins.get_operation_cls("test-operation")
+ operation_ins = self.base_ins.get_operation_cls("test-operation")
def test_get_operation_cls_fail(self):
- base_ins = baseoperation.BaseOperation(self.config, None)
with self.assertRaises(RuntimeError):
- operation_ins = base_ins.get_operation_cls("operation-not-exist")
+ self.base_ins.get_operation_cls("operation-not-exist")
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.availability.result_checker
-# .baseresultchecker
-
-from __future__ import absolute_import
import mock
import unittest
baseresultchecker
-@mock.patch('yardstick.benchmark.scenarios.availability.result_checker'
- '.baseresultchecker.BaseResultChecker')
class ResultCheckerMgrTestCase(unittest.TestCase):
def setUp(self):
self.checker_configs = []
self.checker_configs.append(config)
- def test_ResultCheckerMgr_setup_successful(self, mock_basechacer):
- mgr_ins = baseresultchecker.ResultCheckerMgr()
- mgr_ins.init_ResultChecker(self.checker_configs, None)
- mgr_ins.verify()
+ self.mgr_ins = baseresultchecker.ResultCheckerMgr()
+
+ self._mock_basechecker = mock.patch.object(baseresultchecker,
+ 'BaseResultChecker')
+ self.mock_basechecker = self._mock_basechecker.start()
+ self.addCleanup(self._stop_mock)
+
+ def _stop_mock(self):
+ self._mock_basechecker.stop()
+
+ def test_ResultCheckerMgr_setup_successful(self):
+ self.mgr_ins.verify()
- def test_getitem_succeessful(self, mock_basechacer):
- mgr_ins = baseresultchecker.ResultCheckerMgr()
- mgr_ins.init_ResultChecker(self.checker_configs, None)
- checker_ins = mgr_ins["process-checker"]
+ def test_getitem_succeessful(self):
+ self.mgr_ins.init_ResultChecker(self.checker_configs, None)
+ _ = self.mgr_ins["process-checker"]
- def test_getitem_fail(self, mock_basechacer):
- mgr_ins = baseresultchecker.ResultCheckerMgr()
- mgr_ins.init_ResultChecker(self.checker_configs, None)
+ def test_getitem_fail(self):
+ self.mgr_ins.init_ResultChecker(self.checker_configs, None)
with self.assertRaises(KeyError):
- checker_ins = mgr_ins["checker-not-exist"]
+ _ = self.mgr_ins["checker-not-exist"]
class BaseResultCheckerTestCase(unittest.TestCase):
'checker_type': 'general-result-checker',
'key': 'process-checker'
}
+ self.ins = baseresultchecker.BaseResultChecker(self.checker_cfg, None)
def test_baseresultchecker_setup_verify_successful(self):
- ins = baseresultchecker.BaseResultChecker(self.checker_cfg, None)
- ins.setup()
- ins.verify()
+ self.ins.setup()
+ self.ins.verify()
def test_baseresultchecker_verfiy_pass(self):
- ins = baseresultchecker.BaseResultChecker(self.checker_cfg, None)
- ins.setup()
- ins.actualResult = True
- ins.expectedResult = True
- ins.verify()
+ self.ins.setup()
+ self.ins.actualResult = True
+ self.ins.expectedResult = True
+ self.ins.verify()
def test_get_script_fullpath(self):
- ins = baseresultchecker.BaseResultChecker(self.checker_cfg, None)
- path = ins.get_script_fullpath("test.bash")
+ self.ins.get_script_fullpath("test.bash")
def test_get_resultchecker_cls_successful(self):
baseresultchecker.BaseResultChecker.get_resultchecker_cls(
from yardstick.benchmark.scenarios.availability.director import Director
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.director.basemonitor')
@mock.patch('yardstick.benchmark.scenarios.availability.director.baseattacker')
@mock.patch(
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for
-# yardstick.benchmark.scenarios.availability.monitor.monitor_command
-
-from __future__ import absolute_import
import mock
import unittest
from yardstick.benchmark.scenarios.availability.monitor import monitor_command
-@mock.patch('subprocess.check_output')
class ExecuteShellTestCase(unittest.TestCase):
- def test__fun_execute_shell_command_successful(self, mock_subprocess_check_output):
+ def setUp(self):
+ self._mock_subprocess = mock.patch.object(monitor_command, 'subprocess')
+ self.mock_subprocess = self._mock_subprocess.start()
+ self.addCleanup(self._stop_mock)
+
+ def _stop_mock(self):
+ self._mock_subprocess.stop()
+
+ def test__fun_execute_shell_command_successful(self):
cmd = "env"
- mock_subprocess_check_output.return_value = (0, 'unittest')
- exitcode, _ = monitor_command._execute_shell_command(cmd)
+ self.mock_subprocess.check_output.return_value = (0, 'unittest')
+ exitcode, _t = monitor_command._execute_shell_command(cmd)
self.assertEqual(exitcode, 0)
- @mock.patch('yardstick.benchmark.scenarios.availability.monitor.monitor_command.LOG')
- def test__fun_execute_shell_command_fail_cmd_exception(self, mock_log,
- mock_subprocess_check_output):
+ @mock.patch.object(monitor_command, 'LOG')
+ def test__fun_execute_shell_command_fail_cmd_exception(self, mock_log):
cmd = "env"
- mock_subprocess_check_output.side_effect = RuntimeError
+ self.mock_subprocess.check_output.side_effect = RuntimeError
exitcode, _ = monitor_command._execute_shell_command(cmd)
self.assertEqual(exitcode, -1)
mock_log.error.assert_called_once()
-@mock.patch('subprocess.check_output')
class MonitorOpenstackCmdTestCase(unittest.TestCase):
def setUp(self):
'monitor_time': 1,
'sla': {'max_outage_time': 5}
}
+ self._mock_subprocess = mock.patch.object(monitor_command, 'subprocess')
+ self.mock_subprocess = self._mock_subprocess.start()
+ self.addCleanup(self._stop_mock)
+
+ def _stop_mock(self):
+ self._mock_subprocess.stop()
- def test__monitor_command_monitor_func_successful(self, mock_subprocess_check_output):
+ def test__monitor_command_monitor_func_successful(self):
instance = monitor_command.MonitorOpenstackCmd(self.config, None, {"nova-api": 10})
instance.setup()
- mock_subprocess_check_output.return_value = (0, 'unittest')
+ self.mock_subprocess.check_output.return_value = (0, 'unittest')
ret = instance.monitor_func()
self.assertTrue(ret)
instance._result = {"outage_time": 0}
instance.verify_SLA()
- @mock.patch('yardstick.benchmark.scenarios.availability.monitor.monitor_command.LOG')
- def test__monitor_command_monitor_func_failure(self, mock_log, mock_subprocess_check_output):
- mock_subprocess_check_output.return_value = (1, 'unittest')
+ @mock.patch.object(monitor_command, 'LOG')
+ def test__monitor_command_monitor_func_failure(self, mock_log):
+ self.mock_subprocess.check_output.return_value = (1, 'unittest')
instance = monitor_command.MonitorOpenstackCmd(self.config, None, {"nova-api": 10})
instance.setup()
- mock_subprocess_check_output.side_effect = RuntimeError
+ self.mock_subprocess.check_output.side_effect = RuntimeError
ret = instance.monitor_func()
self.assertFalse(ret)
mock_log.error.assert_called_once()
instance._result = {"outage_time": 10}
instance.verify_SLA()
- @mock.patch(
- 'yardstick.benchmark.scenarios.availability.monitor.monitor_command'
- '.ssh')
- def test__monitor_command_ssh_monitor_successful(self, mock_ssh, mock_subprocess_check_output):
+ @mock.patch.object(monitor_command, 'ssh')
+ def test__monitor_command_ssh_monitor_successful(self, mock_ssh):
- mock_subprocess_check_output.return_value = (0, 'unittest')
+ self.mock_subprocess.check_output.return_value = (0, 'unittest')
self.config["host"] = "node1"
instance = monitor_command.MonitorOpenstackCmd(
self.config, self.context, {"nova-api": 10})
from yardstick.benchmark.scenarios.availability.monitor import monitor_general
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.monitor.'
'monitor_general.ssh')
@mock.patch('yardstick.benchmark.scenarios.availability.monitor.'
import unittest
from yardstick.benchmark.scenarios.availability.monitor import monitor_multi
+
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.monitor.'
'monitor_general.ssh')
@mock.patch('yardstick.benchmark.scenarios.availability.monitor.'
}
def test__monitor_multi_all_successful(self, mock_open, mock_ssh):
- ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context, {"nova-api": 10})
+ ins = monitor_multi.MultiMonitor(
+ self.monitor_cfg, self.context, {"nova-api": 10})
mock_ssh.SSH.from_node().execute.return_value = (0, "running", '')
ins.verify_SLA()
def test__monitor_multi_all_fail(self, mock_open, mock_ssh):
- ins = monitor_multi.MultiMonitor(self.monitor_cfg, self.context, {"nova-api": 10})
+ ins = monitor_multi.MultiMonitor(
+ self.monitor_cfg, self.context, {"nova-api": 10})
mock_ssh.SSH.from_node().execute.return_value = (0, "running", '')
ins.start_monitor()
ins.wait_monitor()
ins.verify_SLA()
-
operation_general
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.operation.'
'operation_general.ssh')
@mock.patch('yardstick.benchmark.scenarios.availability.operation.'
result_checker_general
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.availability.result_checker.'
'result_checker_general.ssh')
@mock.patch('yardstick.benchmark.scenarios.availability.result_checker.'
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2016 Huan Li and others
# lihuansse@tongji.edu.cn
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.availability.scenario_general
-
-from __future__ import absolute_import
import mock
import unittest
-from yardstick.benchmark.scenarios.availability.scenario_general import \
- ScenarioGeneral
-
+from yardstick.benchmark.scenarios.availability import scenario_general
-@mock.patch(
- 'yardstick.benchmark.scenarios.availability.scenario_general.Director')
class ScenarioGeneralTestCase(unittest.TestCase):
def setUp(self):
'index': 2}]
}
}
+ self.instance = scenario_general.ScenarioGeneral(self.scenario_cfg, None)
+
+ self._mock_director = mock.patch.object(scenario_general, 'Director')
+ self.mock_director = self._mock_director.start()
+ self.addCleanup(self._stop_mock)
+
+ def _stop_mock(self):
+ self._mock_director.stop()
- def test_scenario_general_all_successful(self, mock_director):
- ins = ScenarioGeneral(self.scenario_cfg, None)
- ins.setup()
- ins.run({})
- ins.teardown()
+ def test_scenario_general_all_successful(self):
+ self.instance.setup()
+ self.instance.run({})
+ self.instance.teardown()
- def test_scenario_general_exception(self, mock_director):
- ins = ScenarioGeneral(self.scenario_cfg, None)
+ def test_scenario_general_exception(self):
mock_obj = mock.Mock()
mock_obj.createActionPlayer.side_effect = KeyError('Wrong')
- ins.director = mock_obj
- ins.director.data = {}
- ins.run({})
- ins.teardown()
+ self.instance.director = mock_obj
+ self.instance.director.data = {}
+ self.instance.run({})
+ self.instance.teardown()
- def test_scenario_general_case_fail(self, mock_director):
- ins = ScenarioGeneral(self.scenario_cfg, None)
+ def test_scenario_general_case_fail(self):
mock_obj = mock.Mock()
mock_obj.verify.return_value = False
- ins.director = mock_obj
- ins.director.data = {}
- ins.run({})
- ins.pass_flag = True
- ins.teardown()
+ self.instance.director = mock_obj
+ self.instance.director.data = {}
+ self.instance.run({})
+ self.instance.pass_flag = True
+ self.instance.teardown()
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.availability.serviceha
-
-from __future__ import absolute_import
import mock
import unittest
sla = {"outage_time": 5}
self.args = {"options": options, "sla": sla}
- @mock.patch('yardstick.benchmark.scenarios.availability.serviceha.basemonitor')
- @mock.patch(
- 'yardstick.benchmark.scenarios.availability.serviceha.baseattacker')
- def test__serviceha_setup_run_successful(self, _,
- mock_monitor):
+ # NOTE(elfoley): This should be split into test_setup and test_run
+ # NOTE(elfoley): This should explicitly test outcomes and states
+ @mock.patch.object(serviceha, 'baseattacker')
+ @mock.patch.object(serviceha, 'basemonitor')
+ def test__serviceha_setup_run_successful(self, mock_monitor, *args):
p = serviceha.ServiceHA(self.args, self.ctx)
p.setup()
p.setup()
self.assertTrue(p.setup_done)
-# def test__serviceha_run_sla_error(self, mock_attacker, mock_monitor):
-# p = serviceha.ServiceHA(self.args, self.ctx)
+ # def test__serviceha_run_sla_error(self, mock_attacker, mock_monitor):
+ # p = serviceha.ServiceHA(self.args, self.ctx)
-# p.setup()
-# self.assertTrue(p.setup_done)
-#
-# result = {}
-# result["outage_time"] = 10
-# mock_monitor.Monitor().get_result.return_value = result
+ # p.setup()
+ # self.assertEqual(p.setup_done, True)
+
+ # result = {}
+ # result["outage_time"] = 10
+ # mock_monitor.Monitor().get_result.return_value = result
-# ret = {}
-# self.assertRaises(AssertionError, p.run, ret)
+ # ret = {}
+ # self.assertRaises(AssertionError, p.run, ret)
from yardstick.benchmark.scenarios.compute import lmbench
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.compute.lmbench.ssh')
class LmbenchTestCase(unittest.TestCase):
def main():
unittest.main()
+
if __name__ == '__main__':
main()
import mock
-from yardstick.common import utils
from yardstick.benchmark.scenarios.compute import spec_cpu
options = {
"SPECint_benchmark": "perlbench",
- "runspec_tune": "all",
"output_format": "all",
"runspec_iterations": "1",
"runspec_tune": "base",
args = {"options": options}
s = spec_cpu.SpecCPU(args, self.ctx)
- sample_output = ''
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
s.run(self.result)
expected_result = {}
mock_ssh.SSH.from_node().execute.return_value = (1, '', 'FOOBAR')
self.assertRaises(RuntimeError, s.run, self.result)
+
def main():
unittest.main()
+
if __name__ == '__main__':
main()
@mock.patch('yardstick.common.openstack_utils.attach_server_volume')
def test_attach_volume(self, mock_attach_server_volume):
options = {
- 'volume_id': '123-456-000',
- 'server_id': '000-123-456'
+ 'volume_id': '123-456-000',
+ 'server_id': '000-123-456'
}
args = {"options": options}
obj = AttachVolume(args, {})
obj.run({})
self.assertTrue(mock_attach_server_volume.called)
+
def main():
unittest.main()
'target': {
'ipaddr': '172.16.0.138'
}
- }
+ }
@mock.patch('yardstick.benchmark.scenarios.lib.check_connectivity.ssh')
def test_check_connectivity(self, mock_ssh):
'ssh_port': '22',
'ssh_timeout': 600,
'ping_parameter': "-s 2048"
- },
+ },
'sla': {'status': 'True',
'action': 'assert'}
}
- result = {}
+ # TODO(elfoley): Properly check the outputs
+ result = {} # pylint: disable=unused-variable
obj = check_connectivity.CheckConnectivity(args, {})
obj.setup()
mock_ssh.SSH.execute.return_value = (0, '100', '')
-
@mock.patch('yardstick.benchmark.scenarios.lib.check_connectivity.ssh')
def test_check_connectivity_key(self, mock_ssh):
'ssh_port': '22',
'ssh_timeout': 600,
'ping_parameter': "-s 2048"
- },
+ },
'sla': {'status': 'True',
'action': 'assert'}
}
- result = {}
+ # TODO(elfoley): Properly check the outputs
+ result = {} # pylint: disable=unused-variable
obj = check_connectivity.CheckConnectivity(args, self.ctx)
obj.setup()
mock_ssh.SSH.execute.return_value = (0, '100', '')
+
def main():
unittest.main()
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import unittest
import mock
+import unittest
from yardstick.benchmark.scenarios.lib.check_numa_info import CheckNumaInfo
class CheckNumaInfoTestCase(unittest.TestCase):
- @mock.patch(
- 'yardstick.benchmark.scenarios.lib.check_numa_info.CheckNumaInfo._check_vm2_status')
- def test_check_numa_info(self, mock_check_vm2):
+ @mock.patch.object(CheckNumaInfo, '_check_vm2_status')
+ def test_run(self, mock_check_vm2):
scenario_cfg = {'info1': {}, 'info2': {}}
obj = CheckNumaInfo(scenario_cfg, {})
obj.run({})
from yardstick.benchmark.scenarios.lib.check_value import CheckValue
-
class CheckValueTestCase(unittest.TestCase):
+ def setUp(self):
+ self.result = {}
+
def test_check_value_eq(self):
scenario_cfg = {'options': {'operator': 'eq', 'value1': 1, 'value2': 2}}
obj = CheckValue(scenario_cfg, {})
- try:
- obj.run({})
- except Exception as e:
- self.assertIsInstance(e, AssertionError)
+ self.assertRaises(AssertionError, obj.run, self.result)
+ self.assertEqual({}, self.result)
def test_check_value_eq_pass(self):
scenario_cfg = {'options': {'operator': 'eq', 'value1': 1, 'value2': 1}}
obj = CheckValue(scenario_cfg, {})
- try:
- obj.run({})
- except Exception as e:
- self.assertIsInstance(e, AssertionError)
+
+ obj.run(self.result)
+ self.assertEqual({}, self.result)
def test_check_value_ne(self):
scenario_cfg = {'options': {'operator': 'ne', 'value1': 1, 'value2': 1}}
obj = CheckValue(scenario_cfg, {})
- try:
- obj.run({})
- except Exception as e:
- self.assertIsInstance(e, AssertionError)
-
+ self.assertRaises(AssertionError, obj.run, self.result)
+ self.assertEqual({}, self.result)
def main():
unittest.main()
import unittest
import mock
-from yardstick.benchmark.scenarios.lib.create_image import CreateImage
-
+from yardstick.benchmark.scenarios.lib import create_image
+from yardstick.common import openstack_utils
+# NOTE(elfoley): There should be more tests here.
class CreateImageTestCase(unittest.TestCase):
- @mock.patch('yardstick.common.openstack_utils.create_image')
- @mock.patch('yardstick.common.openstack_utils.get_glance_client')
+ @mock.patch.object(openstack_utils, 'create_image')
+ @mock.patch.object(openstack_utils, 'get_glance_client')
def test_create_image(self, mock_get_glance_client, mock_create_image):
options = {
- 'image_name': 'yardstick_test_image_01',
- 'disk_format': 'qcow2',
- 'container_format': 'bare',
- 'min_disk': '1',
- 'min_ram': '512',
- 'protected': 'False',
- 'tags': '["yardstick automatic test image"]',
- 'file_path': '/home/opnfv/images/cirros-0.3.5-x86_64-disk.img'
+ 'image_name': 'yardstick_test_image_01',
+ 'disk_format': 'qcow2',
+ 'container_format': 'bare',
+ 'min_disk': '1',
+ 'min_ram': '512',
+ 'protected': 'False',
+ 'tags': '["yardstick automatic test image"]',
+ 'file_path': '/home/opnfv/images/cirros-0.3.5-x86_64-disk.img'
}
args = {"options": options}
- obj = CreateImage(args, {})
+ obj = create_image.CreateImage(args, {})
obj.run({})
- self.assertTrue(mock_create_image.called)
+ mock_create_image.assert_called_once()
+ mock_get_glance_client.assert_called_once()
def main():
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import unittest
-import mock
-from yardstick.benchmark.scenarios.lib.create_keypair import CreateKeypair
+import mock
+import unittest
-PREFIX = "yardstick.benchmark.scenarios.lib.create_keypair"
+from yardstick.benchmark.scenarios.lib import create_keypair
class CreateKeypairTestCase(unittest.TestCase):
- @mock.patch('{}.paramiko'.format(PREFIX))
- @mock.patch('{}.op_utils'.format(PREFIX))
- def test_create_keypair(self, mock_op_utils, mock_paramiko):
+ @mock.patch.object(create_keypair, 'paramiko')
+ @mock.patch.object(create_keypair, 'op_utils')
+ def test_create_keypair(self, mock_op_utils, *args):
options = {
'key_name': 'yardstick_key',
'key_path': '/tmp/yardstick_key'
}
args = {"options": options}
- obj = CreateKeypair(args, {})
+ obj = create_keypair.CreateKeypair(args, {})
obj.run({})
- self.assertTrue(mock_op_utils.create_keypair.called)
+ mock_op_utils.create_keypair.assert_called_once()
def main():
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.create_network import CreateNetwork
@mock.patch('yardstick.common.openstack_utils.create_neutron_net')
def test_create_network(self, mock_get_neutron_client, mock_create_neutron_net):
options = {
- 'openstack_paras': {
- 'name': 'yardstick_net',
- 'admin_state_up': 'True'
- }
+ 'openstack_paras': {
+ 'name': 'yardstick_net',
+ 'admin_state_up': 'True'
+ }
}
args = {"options": options}
obj = CreateNetwork(args, {})
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.create_port import CreatePort
@mock.patch('yardstick.common.openstack_utils.get_neutron_client')
def test_create_port(self, mock_get_neutron_client):
options = {
- 'openstack_paras': {
- 'name': 'yardstick_port'
- }
+ 'openstack_paras': {
+ 'name': 'yardstick_port'
+ }
}
args = {"options": options}
obj = CreatePort(args, {})
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.create_router import CreateRouter
@mock.patch('yardstick.common.openstack_utils.create_neutron_router')
def test_create_router(self, mock_get_neutron_client, mock_create_neutron_router):
options = {
- 'openstack_paras': {
- 'admin_state_up': 'True',
- 'name': 'yardstick_router'
- }
+ 'openstack_paras': {
+ 'admin_state_up': 'True',
+ 'name': 'yardstick_router'
+ }
}
args = {"options": options}
obj = CreateRouter(args, {})
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.create_sec_group import CreateSecgroup
@mock.patch('yardstick.common.openstack_utils.create_security_group_full')
def test_create_sec_group(self, mock_get_neutron_client, mock_create_security_group_full):
options = {
- 'openstack_paras': {
- 'sg_name': 'yardstick_sec_group',
- 'description': 'security group for yardstick manual VM'
- }
+ 'openstack_paras': {
+ 'sg_name': 'yardstick_sec_group',
+ 'description': 'security group for yardstick manual VM'
+ }
}
args = {"options": options}
obj = CreateSecgroup(args, {})
def test_create_server(self, mock_get_nova_client, mock_get_neutron_client,
mock_get_glance_client, mock_create_instance_and_wait_for_active):
scenario_cfg = {
- 'options' : {
- 'openstack_paras': 'example'
- },
- 'output': 'server'
+ 'options': {
+ 'openstack_paras': 'example'
+ },
+ 'output': 'server'
}
obj = CreateServer(scenario_cfg, {})
obj.run({})
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.create_subnet import CreateSubnet
@mock.patch('yardstick.common.openstack_utils.create_neutron_subnet')
def test_create_subnet(self, mock_get_neutron_client, mock_create_neutron_subnet):
options = {
- 'openstack_paras': {
- 'network_id': '123-123-123',
- 'name': 'yardstick_subnet',
- 'cidr': '10.10.10.0/24',
- 'ip_version': '4'
- }
+ 'openstack_paras': {
+ 'network_id': '123-123-123',
+ 'name': 'yardstick_subnet',
+ 'cidr': '10.10.10.0/24',
+ 'ip_version': '4'
+ }
}
args = {"options": options}
obj = CreateSubnet(args, {})
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-import unittest
import mock
+import unittest
-import yardstick.benchmark.scenarios.lib.create_volume
+from yardstick.benchmark.scenarios.lib import create_volume
class CreateVolumeTestCase(unittest.TestCase):
}
}
- self.scenario = (
- yardstick.benchmark.scenarios.lib.create_volume.CreateVolume(
- scenario_cfg=self.scenario_cfg,
- context_cfg={}))
+ self.scenario = create_volume.CreateVolume(
+ scenario_cfg=self.scenario_cfg,
+ context_cfg={})
def _stop_mock(self):
self._mock_cinder_client.stop()
expected_im_name = self.scenario_cfg["options"]["image"]
expected_im_id = None
- scenario = (
- yardstick.benchmark.scenarios.lib.create_volume.CreateVolume(
- scenario_cfg=self.scenario_cfg,
- context_cfg={}))
+ scenario = create_volume.CreateVolume(
+ scenario_cfg=self.scenario_cfg,
+ context_cfg={})
self.assertEqual(expected_vol_name, scenario.volume_name)
self.assertEqual(expected_vol_size, scenario.volume_size)
mock_image_id.assert_called_once()
mock_create_volume.assert_called_once()
- @mock.patch.object(
- yardstick.benchmark.scenarios.lib.create_volume.CreateVolume, 'setup')
+ @mock.patch.object(create_volume.CreateVolume, 'setup')
def test_run_no_setup(self, scenario_setup):
self.scenario.setup_done = False
self.scenario.run()
scenario_setup.assert_called_once()
+ @mock.patch('yardstick.common.openstack_utils.create_volume')
+ @mock.patch('yardstick.common.openstack_utils.get_image_id')
+ @mock.patch('yardstick.common.openstack_utils.get_cinder_client')
+ @mock.patch('yardstick.common.openstack_utils.get_glance_client')
+ def test_create_volume(self, mock_get_glance_client,
+ mock_get_cinder_client, mock_image_id,
+ mock_create_volume):
+ options = {
+ 'volume_name': 'yardstick_test_volume_01',
+ 'size': '256',
+ 'image': 'cirros-0.3.5'
+ }
+ args = {"options": options}
+ scenario = create_volume.CreateVolume(args, {})
+ scenario.run()
+ self.assertTrue(mock_create_volume.called)
+ self.assertTrue(mock_image_id.called)
+ self.assertTrue(mock_get_glance_client.called)
+ self.assertTrue(mock_get_cinder_client.called)
+
def main():
unittest.main()
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_floating_ip import DeleteFloatingIp
@mock.patch('yardstick.common.openstack_utils.get_glance_client')
def test_delete_image(self, mock_get_glance_client, mock_image_id, mock_delete_image):
options = {
- 'image_name': 'yardstick_test_image_01'
+ 'image_name': 'yardstick_test_image_01'
}
args = {"options": options}
obj = DeleteImage(args, {})
self.assertTrue(mock_image_id.called)
self.assertTrue(mock_get_glance_client.called)
+
def main():
unittest.main()
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_keypair import DeleteKeypair
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_network import DeleteNetwork
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_port import DeletePort
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_router import DeleteRouter
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_router_gateway import DeleteRouterGateway
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_router_interface import DeleteRouterInterface
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.delete_volume import DeleteVolume
##############################################################################
import unittest
import mock
-import paramiko
from yardstick.benchmark.scenarios.lib.detach_volume import DetachVolume
from yardstick.benchmark.scenarios.lib.get_numa_info import GetNumaInfo
+
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
BASE = 'yardstick.benchmark.scenarios.lib.get_numa_info'
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.networking.nstat.Nstat
-
-from __future__ import absolute_import
-
-import unittest
-
import mock
+import unittest
from yardstick.benchmark.scenarios.networking import nstat
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 Ericsson AB and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.networking.pktgen.Pktgen
-
-from __future__ import absolute_import
-
+import mock
import unittest
-import mock
from oslo_serialization import jsonutils
from yardstick.benchmark.scenarios.networking import pktgen
p.server = mock_ssh.SSH.from_node()
p.client = mock_ssh.SSH.from_node()
- mock_iptables_result = mock.Mock()
- mock_iptables_result.return_value = 149300
- p._iptables_get_result = mock_iptables_result
+ p._iptables_get_result = mock.Mock(return_value=149300)
sample_output = '{"packets_per_second": 9753, "errors": 0, \
"packets_sent": 149776, "packetsize": 60, "flows": 110, "ppm": 3179}'
p.server = mock_ssh.SSH.from_node()
p.client = mock_ssh.SSH.from_node()
- mock_iptables_result = mock.Mock()
- mock_iptables_result.return_value = 149300
- p._iptables_get_result = mock_iptables_result
+ p._iptables_get_result = mock.Mock(return_value=149300)
sample_output = '{"packets_per_second": 9753, "errors": 0, \
"packets_sent": 149776, "packetsize": 60, "flows": 110, "ppm": 3179}'
p.server = mock_ssh.SSH.from_node()
p.client = mock_ssh.SSH.from_node()
- mock_iptables_result = mock.Mock()
- mock_iptables_result.return_value = 149300
- p._iptables_get_result = mock_iptables_result
+ p._iptables_get_result = mock.Mock(return_value=149300)
sample_output = '{"packets_per_second": 9753, "errors": 0, \
"packets_sent": 149776, "packetsize": 60, "flows": 110}'
mock_ssh.SSH.from_node().execute.return_value = (0, '4', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = 1
- p._get_usable_queue_number = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = 4
- p._get_available_queue_number = mock_result2
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=4)
p.queue_number = p._enable_ovs_multiqueue()
self.assertEqual(p.queue_number, 4)
mock_ssh.SSH.from_node().execute.return_value = (0, '1', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = 1
- p._get_usable_queue_number = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = 1
- p._get_available_queue_number = mock_result2
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=1)
p.queue_number = p._enable_ovs_multiqueue()
self.assertEqual(p.queue_number, 1)
mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = 1
- p._get_usable_queue_number = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = 4
- p._get_available_queue_number = mock_result2
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=4)
self.assertRaises(RuntimeError, p._enable_ovs_multiqueue)
mock_ssh.SSH.from_node().execute.return_value = (0, '4', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = False
- p._is_irqbalance_disabled = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = "virtio_net"
- p._get_vnic_driver_name = mock_result2
-
- mock_result3 = mock.Mock()
- mock_result3.return_value = 1
- p._get_usable_queue_number = mock_result3
-
- mock_result4 = mock.Mock()
- mock_result4.return_value = 4
- p._get_available_queue_number = mock_result4
+ p._is_irqbalance_disabled = mock.Mock(return_value=False)
+ p._get_vnic_driver_name = mock.Mock(return_value="virtio_net")
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=4)
p.multiqueue_setup()
mock_ssh.SSH.from_node().execute.return_value = (0, '1', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = False
- p._is_irqbalance_disabled = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = "virtio_net"
- p._get_vnic_driver_name = mock_result2
-
- mock_result3 = mock.Mock()
- mock_result3.return_value = 1
- p._get_usable_queue_number = mock_result3
-
- mock_result4 = mock.Mock()
- mock_result4.return_value = 1
- p._get_available_queue_number = mock_result4
+ p._is_irqbalance_disabled = mock.Mock(return_value=False)
+ p._get_vnic_driver_name = mock.Mock(return_value="virtio_net")
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=1)
p.multiqueue_setup()
mock_ssh.SSH.from_node().execute.return_value = (0, '2', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = False
- p._is_irqbalance_disabled = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = "ixgbevf"
- p._get_vnic_driver_name = mock_result2
+ p._is_irqbalance_disabled = mock.Mock(return_value=False)
+ p._get_vnic_driver_name = mock.Mock(return_value="ixgbevf")
p.multiqueue_setup()
mock_ssh.SSH.from_node().execute.return_value = (0, '1', '')
- mock_result1 = mock.Mock()
- mock_result1.return_value = False
- p._is_irqbalance_disabled = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = "ixgbevf"
- p._get_vnic_driver_name = mock_result2
+ p._is_irqbalance_disabled = mock.Mock(return_value=False)
+ p._get_vnic_driver_name = mock.Mock(return_value="ixgbevf")
p.multiqueue_setup()
p.server = mock_ssh.SSH.from_node()
p.client = mock_ssh.SSH.from_node()
- mock_result = mock.Mock()
- mock_result.return_value = "virtio_net"
- p._get_vnic_driver_name = mock_result
-
- mock_result1 = mock.Mock()
- mock_result1.return_value = 1
- p._get_usable_queue_number = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = 4
- p._get_available_queue_number = mock_result2
-
- mock_result3 = mock.Mock()
- mock_result3.return_value = 4
- p._enable_ovs_multiqueue = mock_result3
-
- mock_result4 = mock.Mock()
- p._setup_irqmapping_ovs = mock_result4
-
- mock_iptables_result = mock.Mock()
- mock_iptables_result.return_value = 149300
- p._iptables_get_result = mock_iptables_result
+ p._get_vnic_driver_name = mock.Mock(return_value="virtio_net")
+ p._get_usable_queue_number = mock.Mock(return_value=1)
+ p._get_available_queue_number = mock.Mock(return_value=4)
+ p._enable_ovs_multiqueue = mock.Mock(return_value=4)
+ p._setup_irqmapping_ovs = mock.Mock()
+ p._iptables_get_result = mock.Mock(return_value=149300)
sample_output = '{"packets_per_second": 9753, "errors": 0, \
"packets_sent": 149300, "flows": 110, "ppm": 0}'
p.server = mock_ssh.SSH.from_node()
p.client = mock_ssh.SSH.from_node()
- mock_result1 = mock.Mock()
- mock_result1.return_value = "ixgbevf"
- p._get_vnic_driver_name = mock_result1
-
- mock_result2 = mock.Mock()
- mock_result2.return_value = 2
- p._get_sriov_queue_number = mock_result2
-
- mock_result3 = mock.Mock()
- p._setup_irqmapping_sriov = mock_result3
-
- mock_iptables_result = mock.Mock()
- mock_iptables_result.return_value = 149300
- p._iptables_get_result = mock_iptables_result
+ p._get_vnic_driver_name = mock.Mock(return_value="ixgbevf")
+ p._get_sriov_queue_number = mock.Mock(return_value=2)
+ p._setup_irqmapping_sriov = mock.Mock()
+ p._iptables_get_result = mock.Mock(return_value=149300)
sample_output = '{"packets_per_second": 9753, "errors": 0, \
"packets_sent": 149300, "flows": 110, "ppm": 0}'
-#!/usr/bin/env python
-
##############################################################################
# Copyright (c) 2015 ZTE and others.
#
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-# Unittest for yardstick.benchmark.scenarios.networking.pktgen.Pktgen
-
-from __future__ import absolute_import
-import unittest
-
import mock
+import unittest
import yardstick.common.utils as utils
from yardstick.benchmark.scenarios.networking import pktgen_dpdk
+#!/usr/bin/env python
##############################################################################
# Copyright (c) 2017 Nokia and others.
#
# which accompanies this distribution, and is available at
# http://www.apache.org/licenses/LICENSE-2.0
##############################################################################
-#!/usr/bin/env python
# Unittest for yardstick.benchmark.scenarios.networking.pktgen.PktgenDPDK
from yardstick.benchmark.scenarios.networking import pktgen_dpdk_throughput
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
@mock.patch('yardstick.benchmark.scenarios.networking.pktgen_dpdk_throughput.ssh')
class PktgenDPDKTestCase(unittest.TestCase):
from copy import deepcopy
-from tests.unit import STL_MOCKS
+from yardstick.tests.unit import STL_MOCKS
from yardstick.benchmark.scenarios.networking.vnf_generic import \
SshManager, NetworkServiceTestCase, IncorrectConfig, \
open_relative_file
GenericTrafficGen, GenericVNF
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
COMPLETE_TREX_VNFD = {
'vnfd:vnfd-catalog': {
'vnfd': [
ssh_mock.execute = \
mock.Mock(return_value=(0, SYS_CLASS_NET + IP_ADDR_SHOW, ""))
ssh.from_node.return_value = ssh_mock
- for node, node_dict in self.context_cfg["nodes"].items():
+ for _, node_dict in self.context_cfg["nodes"].items():
with SshManager(node_dict) as conn:
self.assertIsNotNone(conn)
self.scenario_cfg["traffic_options"]["flow"] = \
self._get_file_abspath("ipv4_1flow_Packets_vpe.yaml")
result = '152.16.100.2-152.16.100.254'
- self.assertEqual(result, self.s._get_ip_flow_range('152.16.100.2-152.16.100.254'))
+ self.assertEqual(result, self.s._get_ip_flow_range(
+ '152.16.100.2-152.16.100.254'))
def test__get_ip_flow_range(self):
self.scenario_cfg["traffic_options"]["flow"] = \
self.scenario_cfg["options"] = {}
self.scenario_cfg['options'] = {
'flow': {
- 'src_ip': [
- {
- 'tg__1': 'xe0',
- },
- ],
- 'dst_ip': [
- {
- 'tg__1': 'xe1',
- },
- ],
- 'public_ip': ['1.1.1.1'],
+ 'src_ip': [
+ {
+ 'tg__1': 'xe0',
+ },
+ ],
+ 'dst_ip': [
+ {
+ 'tg__1': 'xe1',
+ },
+ ],
+ 'public_ip': ['1.1.1.1'],
},
}
- result = {'flow': {'dst_ip0': '152.16.40.2-152.16.40.254',
- 'src_ip0': '152.16.100.2-152.16.100.254'}}
self.assertEqual({'flow': {}}, self.s._get_traffic_flow())
self.s.map_topology_to_infrastructure()
nodes = self.context_cfg["nodes"]
- self.assertEqual("../../vnf_descriptors/tg_rfc2544_tpl.yaml", nodes['tg__1']['VNF model'])
- self.assertEqual("../../vnf_descriptors/vpe_vnf.yaml", nodes['vnf__1']['VNF model'])
+ self.assertEqual(
+ "../../vnf_descriptors/tg_rfc2544_tpl.yaml", nodes['tg__1']['VNF model'])
+ self.assertEqual("../../vnf_descriptors/vpe_vnf.yaml",
+ nodes['vnf__1']['VNF model'])
def test_map_topology_to_infrastructure_insufficient_nodes(self):
del self.context_cfg['nodes']['vnf__1']
del interface['local_mac']
with mock.patch(
- "yardstick.benchmark.scenarios.networking.vnf_generic.LOG") as mock_log:
+ "yardstick.benchmark.scenarios.networking.vnf_generic.LOG"):
with self.assertRaises(IncorrectConfig) as raised:
self.s._resolve_topology()
self.s.topology["vld"][0]['vnfd-connection-point-ref'][0])
with mock.patch(
- "yardstick.benchmark.scenarios.networking.vnf_generic.LOG") as mock_log:
+ "yardstick.benchmark.scenarios.networking.vnf_generic.LOG"):
with self.assertRaises(IncorrectConfig) as raised:
self.s._resolve_topology()
self.s.topology["vld"][0]['vnfd-connection-point-ref'][:1]
with mock.patch(
- "yardstick.benchmark.scenarios.networking.vnf_generic.LOG") as mock_log:
+ "yardstick.benchmark.scenarios.networking.vnf_generic.LOG"):
with self.assertRaises(IncorrectConfig) as raised:
self.s._resolve_topology()
tgen.name = "tgen__1"
vnf = mock.Mock(autospec=GenericVNF)
vnf.runs_traffic = False
- vnf.instantiate.side_effect = RuntimeError("error during instantiate")
+ vnf.instantiate.side_effect = RuntimeError(
+ "error during instantiate")
vnf.terminate = mock.Mock(return_value=True)
self.s.vnfs = [tgen, vnf]
self.s.traffic_profile = mock.Mock()
def test___get_traffic_imix_exception(self):
with mock.patch.dict(self.scenario_cfg["traffic_options"], {'imix': ''}):
- self.assertEqual({'imix': {'64B': 100}}, self.s._get_traffic_imix())
+ self.assertEqual({'imix': {'64B': 100}},
+ self.s._get_traffic_imix())
def test__fill_traffic_profile(self):
with mock.patch.dict("sys.modules", STL_MOCKS):
def test_teardown_exception(self):
vnf = mock.Mock(autospec=GenericVNF)
- vnf.terminate = mock.Mock(side_effect=RuntimeError("error duing terminate"))
+ vnf.terminate = mock.Mock(
+ side_effect=RuntimeError("error duing terminate"))
vnf.name = str(vnf)
self.s.vnfs = [vnf]
self.s.traffic_profile = mock.Mock()
NetworkServiceTestCase._probe_missing_values(netdevs, network)
assert network['vpci'] == '0000:00:19.0'
+ # TODO: Split this into several tests, for different IOError sub-types
def test_open_relative_path(self):
mock_open = mock.mock_open()
mock_open_result = mock_open()
# test
with mock.patch(module_name, mock_open, create=True):
- self.assertEqual(open_relative_file('foo', 'bar'), mock_open_result)
+ self.assertEqual(open_relative_file(
+ 'foo', 'bar'), mock_open_result)
mock_open_call_count += 1 # one more call expected
self.assertEqual(mock_open.call_count, mock_open_call_count)
raise IOError(errno.ENOENT, 'not found')
mock_open.side_effect = open_effect
- self.assertEqual(open_relative_file('foo', 'bar'), mock_open_result)
+ self.assertEqual(open_relative_file(
+ 'foo', 'bar'), mock_open_result)
mock_open_call_count += 2 # two more calls expected
self.assertEqual(mock_open.call_count, mock_open_call_count)
--- /dev/null
+# Copyright 2017 Nokia
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import subprocess
+import time
+
+import mock
+import unittest
+
+from yardstick.benchmark.scenarios.networking import vsperf_dpdk
+
+
+class VsperfDPDKTestCase(unittest.TestCase):
+
+ def setUp(self):
+ self.ctx = {
+ "host": {
+ "ip": "10.229.47.137",
+ "user": "ubuntu",
+ "password": "ubuntu",
+ },
+ }
+ self.args = {
+ 'task_id': "1234-5678",
+ 'options': {
+ 'testname': 'pvp_tput',
+ 'traffic_type': 'rfc2544_throughput',
+ 'frame_size': '64',
+ 'test_params': 'TRAFFICGEN_DURATION=30;',
+ 'trafficgen_port1': 'ens4',
+ 'trafficgen_port2': 'ens5',
+ 'conf_file': 'vsperf-yardstick.conf',
+ 'setup_script': 'setup_yardstick.sh',
+ 'moongen_helper_file': '~/moongen.py',
+ 'moongen_host_ip': '10.5.201.151',
+ 'moongen_port1_mac': '8c:dc:d4:ae:7c:5c',
+ 'moongen_port2_mac': '8c:dc:d4:ae:7c:5d',
+ 'trafficgen_port1_nw': 'test2',
+ 'trafficgen_port2_nw': 'test3',
+ },
+ 'sla': {
+ 'metrics': 'throughput_rx_fps',
+ 'throughput_rx_fps': 500000,
+ 'action': 'monitor',
+ }
+ }
+
+ self.scenario = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
+
+ self._mock_ssh = mock.patch(
+ 'yardstick.benchmark.scenarios.networking.vsperf_dpdk.ssh')
+ self.mock_ssh = self._mock_ssh.start()
+ self._mock_subprocess_call = mock.patch.object(subprocess, 'call')
+ self.mock_subprocess_call = self._mock_subprocess_call.start()
+
+ self.addCleanup(self._cleanup)
+
+ def _cleanup(self):
+ self._mock_ssh.stop()
+ self._mock_subprocess_call.stop()
+
+ def test_setup(self):
+ # setup() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ def test_teardown(self):
+ # setup() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ self.scenario.teardown()
+ self.assertFalse(self.scenario.setup_done)
+
+ def test_is_dpdk_setup_no(self):
+ # setup() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ # is_dpdk_setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
+
+ result = self.scenario._is_dpdk_setup()
+ self.assertFalse(result)
+
+ def test_is_dpdk_setup_yes(self):
+ # setup() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ # is_dpdk_setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+
+ result = self.scenario._is_dpdk_setup()
+ self.assertTrue(result)
+
+ @mock.patch.object(time, 'sleep')
+ def test_dpdk_setup_first(self, *args):
+ # setup() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ # is_dpdk_setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
+
+ self.scenario.dpdk_setup()
+ self.assertFalse(self.scenario._is_dpdk_setup())
+ self.assertTrue(self.scenario.dpdk_setup_done)
+
+ @mock.patch.object(time, 'sleep')
+ def test_dpdk_setup_next(self, *args):
+ # setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ self.scenario.dpdk_setup()
+ self.assertTrue(self.scenario._is_dpdk_setup())
+ self.assertTrue(self.scenario.dpdk_setup_done)
+
+ @mock.patch.object(time, 'sleep')
+ def test_dpdk_setup_runtime_error(self, *args):
+
+ # setup specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
+ self.assertTrue(self.scenario.setup_done)
+
+ self.assertRaises(RuntimeError, self.scenario.dpdk_setup)
+
+ @mock.patch.object(subprocess, 'check_output')
+ @mock.patch('time.sleep')
+ def test_run_ok(self, *args):
+ # setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ # run() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+ self.mock_ssh.SSH.from_node().execute.return_value = (
+ 0, 'throughput_rx_fps\r\n14797660.000\r\n', '')
+
+ result = {}
+ self.scenario.run(result)
+
+ self.assertEqual(result['throughput_rx_fps'], '14797660.000')
+
+ def test_run_failed_vsperf_execution(self):
+ # setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
+
+ result = {}
+ self.assertRaises(RuntimeError, self.scenario.run, result)
+
+ def test_run_falied_csv_report(self):
+ # setup() specific mocks
+ self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ self.mock_subprocess_call().execute.return_value = None
+
+ self.scenario.setup()
+ self.assertIsNotNone(self.scenario.client)
+ self.assertTrue(self.scenario.setup_done)
+
+ # run() specific mocks
+ self.mock_subprocess_call().execute.return_value = None
+ self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
+
+ result = {}
+ self.assertRaises(RuntimeError, self.scenario.run, result)
+
+
+def main():
+ unittest.main()
+
+
+if __name__ == '__main__':
+ main()
import mock
-from yardstick.common import utils
from yardstick.benchmark.scenarios.storage import bonnie
mock_ssh.SSH.from_node().execute.return_value = (1, '', 'FOOBAR')
self.assertRaises(RuntimeError, b.run, self.result)
+
def main():
unittest.main()
+
if __name__ == '__main__':
main()
from yardstick.benchmark.scenarios.storage import storperf
+# pylint: disable=unused-argument
+# disable this for now because I keep forgetting mock patch arg ordering
+
+
def mocked_requests_config_post(*args, **kwargs):
- class MockResponseConfigPost:
+ class MockResponseConfigPost(object):
def __init__(self, json_data, status_code):
self.content = json_data
def mocked_requests_config_get(*args, **kwargs):
- class MockResponseConfigGet:
+ class MockResponseConfigGet(object):
def __init__(self, json_data, status_code):
self.content = json_data
def mocked_requests_job_get(*args, **kwargs):
- class MockResponseJobGet:
+ class MockResponseJobGet(object):
def __init__(self, json_data, status_code):
self.content = json_data
def mocked_requests_job_post(*args, **kwargs):
- class MockResponseJobPost:
+ class MockResponseJobPost(object):
def __init__(self, json_data, status_code):
self.content = json_data
def mocked_requests_job_delete(*args, **kwargs):
- class MockResponseJobDelete:
+ class MockResponseJobDelete(object):
def __init__(self, json_data, status_code):
self.content = json_data
def mocked_requests_delete(*args, **kwargs):
- class MockResponseDelete:
+ class MockResponseDelete(object):
def __init__(self, json_data, status_code):
self.json_data = json_data
def mocked_requests_delete_failed(*args, **kwargs):
- class MockResponseDeleteFailed:
+ class MockResponseDeleteFailed(object):
def __init__(self, json_data, status_code):
self.json_data = json_data