Upload the contribution of vstf as bottleneck network framework.
[bottlenecks.git] / vstf / vstf / agent / perf / netperf.py
1 #!/usr/bin/python
2 # -*- coding: utf8 -*-
3 # author:
4 # date: 2015-09-15
5 # see license for license details
6 import time
7 import subprocess
8 import vstf.common.constants as cst
9 import vstf.common.decorator as deco
10 from vstf.common import perfmark as mark
11 from vstf.common.utils import kill_by_name, my_popen
12
13 import logging
14
15 LOG = logging.getLogger(__name__)
16
17
18 class Netperf(object):
19     def __init__(self):
20         self._send_processes = []
21         self._islat = False
22         self._typemap = {
23             "tcp_lat": "TCP_STREAM",
24             "tcp_bw": "TCP_STREAM",
25             "udp_lat": "UDP_STREAM",
26             "udp_bw": "UDP_STREAM",
27         }
28
29     @deco.check("protocol", choices=cst.PROTOCOLS)
30     @deco.check("namespace", defaults=None)
31     @deco.check("dst")
32     @deco.check("time", defaults=0)
33     @deco.check("size", defaults=64)
34     @deco.check("threads", defaults=1)
35     def send_start(self, **kwargs):
36         threads = kwargs.pop('threads')
37         kwargs['buf'] = cst.SOCKET_BUF
38         if kwargs['protocol'] in ['tcp_lat', 'udp_lat']:
39             self._islat = True
40         else:
41             kwargs['time'] = 0
42
43         cmd = self.format_send_start(**kwargs)
44         LOG.info("cmd:%s", cmd)
45
46         for _ in range(threads):
47             process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
48             self._send_processes.append(process)
49         time.sleep(0.5)
50         for process in self._send_processes:
51             ret = process.poll()
52             if ret is None:
53                 ret = 0
54                 error_str = "start netperf send success"
55             else:
56                 error_str = "start netperf send failed, %s" % (str(kwargs))
57                 process.wait()
58                 self._send_processes.remove(process)
59
60         return ret, error_str
61
62     def send_stop(self, **kwargs):
63         LOG.info("send_stop")
64         results = []
65         ret = 0
66         for process in self._send_processes:
67             poll = process.poll()
68             if poll is None:
69                 if not self._islat:
70                     process.kill()
71                     read = "process is stopped by killed"
72                 else:
73                     ret = process.wait()
74                     read = process.stdout.read()
75                     read = self._parse_data(read)
76                 results.append((ret, read))
77         self._send_processes = []
78         self._islat = False
79         return results
80
81     @staticmethod
82     def _parse_data(data):
83         buf = data.splitlines()
84         data = buf[2].strip().split(',')
85         result = {
86             mark.minLatency: float(data[0]),
87             mark.avgLatency: float(data[1]),
88             mark.maxLatency: float(data[2])
89         }
90         return result
91
92     @deco.namespace()
93     def format_send_start(self, **kwargs):
94         #       cmd = "netperf -H %(dst_ip)s -t %(type)s -l %(time)s -- -m %(pkt_size)s "
95         cmd = "netperf -H %(dst_ip)s -t %(type)s -l %(time)s  " \
96               "-- -m %(pkt_size)s -s %(buf)s -S %(buf)s -o  MIN_LATENCY,MEAN_LATENCY,MAX_LATENCY"
97         context = {
98             'dst_ip': kwargs['dst'][0]['ip'],
99             'type': self._typemap[kwargs['protocol']],
100             'time': kwargs['time'],
101             'pkt_size': kwargs['size'],
102             'buf': kwargs['buf'],
103         }
104         cmd = cmd % context
105         return cmd
106
107     @deco.namespace()
108     def format_receive_start(self, **kwargs):
109         cmd = 'netserver'
110         return cmd
111
112     @deco.check("namespace")
113     def receive_start(self, **kwargs):
114
115         cmd = self.format_receive_start(**kwargs)
116         LOG.info("cmd:%s", cmd)
117
118         process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
119         time.sleep(0.5)
120         ret = process.poll()
121         if ret:
122             error_str = "start netserver failed, %s" % (str(kwargs))
123         else:
124             ret = 0
125             error_str = "start netserver success"
126
127         return ret, error_str
128
129     def receive_stop(self, **kwargs):
130         LOG.info("receive_stop")
131         ret = 0
132         kill_by_name('netserver')
133         time.sleep(0.5)
134         error_str = "stop netserver success"
135         return ret, error_str
136
137     def clean(self):
138         self.send_stop()
139         self.receive_stop()
140         return True
141
142     def force_clean(self):
143         LOG.info("%s %s start", self.__class__, self.force_clean.__name__)
144         kill_by_name('netserver')
145         kill_by_name('netperf')
146         self._send_processes = []
147         self._receive_processes = []
148         return True
149
150
151 def unit_test():
152     perf = Netperf()
153     ret = perf.receive_start(namespace='receive')
154     print "*********receive_start***********"
155     print ret
156     send = {
157         "namespace": "send",
158         "protocol": "udp_lat",
159         "dst": [
160             {"ip": "192.168.1.102"}
161         ],
162         "size": 64,
163         "threads": 1,
164         "time": 10,
165     }
166     print perf.send_start(**send)
167     print perf._send_processes
168     time.sleep(10)
169     print perf.send_stop()
170     print perf.receive_stop()
171
172
173 if __name__ == "__main__":
174     from vstf.common.log import setup_logging
175
176     setup_logging(level=logging.DEBUG, log_file="/var/log/vstf/vstf-netperf.log", clevel=logging.DEBUG)
177     unit_test()