# Unittest for yardstick.benchmark.scenarios.networking.vsperf.Vsperf
-import mock
+from __future__ import absolute_import
+try:
+ from unittest import mock
+except ImportError:
+ import mock
import unittest
-import os
-import subprocess
from yardstick.benchmark.scenarios.networking import vsperf
@mock.patch('yardstick.benchmark.scenarios.networking.vsperf.subprocess')
@mock.patch('yardstick.benchmark.scenarios.networking.vsperf.ssh')
-@mock.patch("__builtin__.open", return_value=None)
+@mock.patch("yardstick.benchmark.scenarios.networking.vsperf.open",
+ mock.mock_open())
class VsperfTestCase(unittest.TestCase):
def setUp(self):
}
}
- def test_vsperf_setup(self, mock_open, mock_ssh, mock_subprocess):
+ def test_vsperf_setup(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
- mock_ssh.SSH().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
mock_subprocess.call().execute.return_value = None
p.setup()
self.assertIsNotNone(p.client)
self.assertEqual(p.setup_done, True)
- def test_vsperf_teardown(self, mock_open, mock_ssh, mock_subprocess):
+ def test_vsperf_teardown(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
# setup() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
mock_subprocess.call().execute.return_value = None
p.setup()
p.teardown()
self.assertEqual(p.setup_done, False)
- def test_vsperf_run_ok(self, mock_open, mock_ssh, mock_subprocess):
+ def test_vsperf_run_ok(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
# setup() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
mock_subprocess.call().execute.return_value = None
# run() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
- mock_ssh.SSH().execute.return_value = (0, 'throughput_rx_fps\r\n14797660.000\r\n', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (
+ 0, 'throughput_rx_fps\r\n14797660.000\r\n', '')
result = {}
p.run(result)
self.assertEqual(result['throughput_rx_fps'], '14797660.000')
- def test_vsperf_run_falied_vsperf_execution(self, mock_open, mock_ssh, mock_subprocess):
+ def test_vsperf_run_falied_vsperf_execution(self, mock_ssh,
+ mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
# setup() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
mock_subprocess.call().execute.return_value = None
# run() specific mocks
- mock_ssh.SSH().execute.return_value = (1, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
result = {}
self.assertRaises(RuntimeError, p.run, result)
- def test_vsperf_run_falied_csv_report(self, mock_open, mock_ssh, mock_subprocess):
+ def test_vsperf_run_falied_csv_report(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
# setup() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
mock_subprocess.call().execute.return_value = None
# run() specific mocks
- mock_ssh.SSH().execute.return_value = (0, '', '')
- mock_ssh.SSH().execute.return_value = (1, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
+ mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
result = {}
self.assertRaises(RuntimeError, p.run, result)
def main():
unittest.main()
+
if __name__ == '__main__':
main()