3 ##############################################################################
4 # Copyright (c) 2015 ZTE and others.
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
12 # Unittest for yardstick.benchmark.scenarios.networking.pktgen.Pktgen
18 from yardstick.benchmark.scenarios.networking import pktgen_dpdk
20 @mock.patch('yardstick.benchmark.scenarios.networking.pktgen_dpdk.ssh')
21 class PktgenDPDKLatencyTestCase(unittest.TestCase):
28 'key_filename': 'mykey.key'
33 'key_filename': 'mykey.key',
34 'ipaddr': '172.16.0.138'
38 def test_pktgen_dpdk_successful_setup(self, mock_ssh):
41 'options': {'packetsize': 60},
43 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
46 mock_ssh.SSH().execute.return_value = (0, '', '')
47 self.assertIsNotNone(p.server)
48 self.assertIsNotNone(p.client)
49 self.assertEqual(p.setup_done, True)
51 def test_pktgen_dpdk_successful_get_port_ip(self, mock_ssh):
54 'options': {'packetsize': 60},
56 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
57 p.server = mock_ssh.SSH()
59 mock_ssh.SSH().execute.return_value = (0, '', '')
61 p.get_port_ip(p.server, "eth1")
63 mock_ssh.SSH().execute.assert_called_with(
64 "ifconfig eth1 |grep 'inet addr' |awk '{print $2}' \
67 def test_pktgen_dpdk_unsuccessful_get_port_ip(self, mock_ssh):
70 'options': {'packetsize': 60},
73 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
74 p.server = mock_ssh.SSH()
76 mock_ssh.SSH().execute.return_value = (1, '', 'FOOBAR')
77 self.assertRaises(RuntimeError, p.get_port_ip, p.server, "eth1")
79 def test_pktgen_dpdk_successful_get_port_mac(self, mock_ssh):
82 'options': {'packetsize': 60},
84 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
85 p.server = mock_ssh.SSH()
87 mock_ssh.SSH().execute.return_value = (0, '', '')
89 p.get_port_mac(p.server, "eth1")
91 mock_ssh.SSH().execute.assert_called_with(
92 "ifconfig |grep HWaddr |grep eth1 |awk '{print $5}' ")
94 def test_pktgen_dpdk_unsuccessful_get_port_mac(self, mock_ssh):
97 'options': {'packetsize': 60},
100 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
101 p.server = mock_ssh.SSH()
103 mock_ssh.SSH().execute.return_value = (1, '', 'FOOBAR')
104 self.assertRaises(RuntimeError, p.get_port_mac, p.server, "eth1")
106 def test_pktgen_dpdk_successful_no_sla(self, mock_ssh):
109 'options': {'packetsize': 60},
113 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
115 sample_output = '100\n110\n112\n130\n149\n150\n90\n150\n200\n162\n'
116 mock_ssh.SSH().execute.return_value = (0, sample_output, '')
119 self.assertEqual(result, {"avg_latency": 132})
121 def test_pktgen_dpdk_successful_sla(self, mock_ssh):
124 'options': {'packetsize': 60},
125 'sla': {'max_latency': 100}
129 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
131 sample_output = '100\n100\n100\n100\n100\n100\n100\n100\n100\n100\n'
132 mock_ssh.SSH().execute.return_value = (0, sample_output, '')
136 self.assertEqual(result, {"avg_latency": 100})
138 def test_pktgen_dpdk_unsuccessful_sla(self, mock_ssh):
141 'options': {'packetsize': 60},
142 'sla': {'max_latency': 100}
146 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
148 p.server = mock_ssh.SSH()
149 p.client = mock_ssh.SSH()
151 sample_output = '100\n110\n112\n130\n149\n150\n90\n150\n200\n162\n'
152 mock_ssh.SSH().execute.return_value = (0, sample_output, '')
153 self.assertRaises(AssertionError, p.run, result)
155 def test_pktgen_dpdk_unsuccessful_script_error(self, mock_ssh):
158 'options': {'packetsize': 60},
159 'sla': {'max_latency': 100}
163 p = pktgen_dpdk.PktgenDPDKLatency(args, self.ctx)
165 mock_ssh.SSH().execute.return_value = (1, '', 'FOOBAR')
166 self.assertRaises(RuntimeError, p.run, result)
172 if __name__ == '__main__':