rubbos installation guide
[bottlenecks.git] / vstf / vstf / agent / unittest / perf / test_pktgen.py
1 """
2 Created on 2015-9-24
3
4 @author: y00228926
5 """
6 import unittest
7 import time
8
9 from vstf.agent.unittest.perf import model
10 from vstf.agent.perf import pktgen
11
12
13 class TestPktgen(model.Model):
14     def setUp(self):
15         super(TestPktgen, self).setUp()
16
17     def tearDown(self):
18         super(TestPktgen, self).tearDown()
19
20     def test_single_thread(self):
21         perf = pktgen.Pktgen()
22         print perf.receive_start()
23         send = {
24             "src": [
25                 {"iface": self.device_list[0], "mac": self.mac_list[0]}
26             ],
27             "dst": [
28                 {"mac": self.mac_list[1]}
29             ],
30             "size": 64,
31             "threads": 1,
32             'ratep': 0
33         }
34         ret = perf.send_start(**send)
35         self.assertEqual((0, 'start pktgen send success'), ret, "send_start failed, ret=%s" % str(ret))
36         time.sleep(5)
37         ret = perf.send_stop()
38         self.assertEqual([(0, '')], ret, "send_start failed, ret=%s" % ret)
39         ret = perf.receive_stop()
40         self.assertEqual((0, 'pktgen neednt receive stop'), ret, "send_stop failed, ret=%s" % str(ret))
41
42     def test_single_thread_bidirectional(self):
43         perf = pktgen.Pktgen()
44         print perf.receive_start()
45         send = {
46             "src": [
47                 {"iface": self.device_list[0], "mac": self.mac_list[0]},
48                 {"iface": self.device_list[1], "mac": self.mac_list[1]}
49             ],
50             "dst": [
51                 {"mac": self.mac_list[1]},
52                 {"mac": self.mac_list[0]}
53             ],
54             "size": 64,
55             "threads": 1,
56             'ratep': 0
57         }
58         ret = perf.send_start(**send)
59         self.assertEqual((0, 'start pktgen send success'), ret, "send_start failed, ret=%s" % str(ret))
60         time.sleep(5)
61         ret = perf.send_stop()
62         self.assertEqual([(0, '')], ret, "send_start failed, ret=%s" % ret)
63         ret = perf.receive_stop()
64         self.assertEqual((0, 'pktgen neednt receive stop'), ret, "send_stop failed, ret=%s" % str(ret))
65
66     def test_clean(self):
67         perf = pktgen.Pktgen()
68         print perf.receive_start()
69         send = {
70             "src": [
71                 {"iface": self.device_list[0], "mac": self.mac_list[0]}
72             ],
73             "dst": [
74                 {"mac": self.mac_list[1]}
75             ],
76             "size": 64,
77             "threads": 1,
78             'ratep': 0
79         }
80         ret = perf.send_start(**send)
81         self.assertEqual((0, 'start pktgen send success'), ret, "send_start failed, ret=%s" % str(ret))
82         perf.clean()
83
84
85 if __name__ == "__main__":
86     import logging
87
88     logging.getLogger(__name__)
89     logging.basicConfig(level=logging.DEBUG)
90     unittest.main()