3cb9eafd876a2b3d9f45c9a27ed0f376cc82142b
[bottlenecks.git] / vstf / vstf / agent / perf / qperf.py
1 #!/usr/bin/python
2 # -*- coding: utf8 -*-
3 # author:
4 # date: 2015-09-15
5 # see license for license details
6
7 import subprocess
8 import time
9 import logging
10 import vstf.common.decorator as deco
11 from vstf.common import perfmark as mark
12 from vstf.common.utils import kill_by_name, my_popen
13
14 LOG = logging.getLogger(__name__)
15
16
17 class Qperf(object):
18     def __init__(self):
19         self._send_processes = []
20         self._receive_processes = []
21
22     @deco.check("protocol", choices=['tcp_lat', 'udp_lat'])
23     @deco.check("namespace", defaults=None)
24     @deco.check("dst")
25     @deco.check("time", defaults=10)
26     @deco.check("size", defaults=64)
27     def send_start(self, **kwargs):
28         cmd = self.format_send_start(**kwargs)
29         LOG.info("cmd:%s", cmd)
30         process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
31         time.sleep(0.5)
32         ret = process.poll()
33         if ret is None:
34             ret = 0
35             error_str = "start qperf send success"
36             self._send_processes.append(process)
37         else:
38             print ret
39             error_str = "start qperf send failed, %s" % (str(kwargs))
40             process.wait()
41
42         return ret, error_str
43
44     @deco.namespace()
45     def format_send_start(self, **kwargs):
46         cmd = "qperf %(dst_ip)s -t %(time)s -m %(pkt_size)s -vu %(type)s "
47         context = {
48             'dst_ip': kwargs['dst'][0]['ip'],
49             'type': kwargs['protocol'],
50             'time': kwargs['time'],
51             'pkt_size': kwargs['size'],
52         }
53         cmd = cmd % context
54         return cmd
55
56     def send_stop(self, **kwargs):
57         results = []
58         for process in self._send_processes:
59             process.wait()
60             read = process.stdout.read()
61             read = self._parse_data(read)
62             ret = 0
63             results.append((ret, read))
64         self._send_processes = []
65         return results
66
67     @deco.namespace()
68     def format_receive_start(self, **kwargs):
69         cmd = 'qperf'
70         return cmd
71
72     def receive_start(self, **kwargs):
73         cmd = self.format_receive_start(**kwargs)
74         LOG.info("cmd:%s", cmd)
75
76         process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
77         time.sleep(0.5)
78         ret = process.poll()
79         if ret is None:
80             ret = 0
81             error_str = "start qperf receive success"
82             self._receive_processes.append(process)
83         else:
84             print ret
85             error_str = "start qperf receive failed, %s" % (str(kwargs))
86             process.wait()
87             raise Exception(error_str)
88         return ret, error_str
89
90     def receive_stop(self, **kwargs):
91         ret = 0
92         for process in self._receive_processes:
93             process.kill()
94             process.wait()
95         self._receive_processes = []
96         error_str = "stop qperf receive success"
97         return ret, error_str
98
99     def receive_kill(self):
100         kill_by_name('qperf')
101         self._receive_processes = []
102         return True
103
104     def clean(self):
105         for process in self._receive_processes:
106             process.kill()
107             process.wait()
108             LOG.info("process.kill(qperf daemon:%s)", process.pid)
109         for process in self._send_processes:
110             LOG.info("process.wait(qperf client:%s)", process.pid)
111             process.wait()
112         self._receive_processes = []
113         self._send_processes = []
114         return True
115
116     def force_clean(self):
117         LOG.info("%s %s start", self.__class__, self.force_clean.__name__)
118         kill_by_name('qperf')
119         self._send_processes = []
120         self._receive_processes = []
121         return True
122
123     def _parse_data(self, data):
124         LOG.info(data)
125         latency = 0
126         if data:
127             buf = data.splitlines()
128             if "latency" in buf[1]:
129                 data = buf[1].strip().split()
130                 if data[3] == "us":
131                     latency = float(data[2]) / 1000
132                 else:
133                     latency = float(data[2]) / 1000
134         result = {
135             mark.minLatency: latency,
136             mark.avgLatency: latency,
137             mark.maxLatency: latency
138         }
139         return result
140
141
142 def unit_test():
143     perf = Qperf()
144     perf.receive_start(namespace='receive')
145
146     send = {
147         "namespace": "send",
148         "protocol": "udp_lat",
149         "dst": [
150             {"ip": "192.168.1.102"}
151         ],
152         "size": 64,
153     }
154     print perf.send_start(**send)
155     time.sleep(10)
156     print perf.send_stop()
157     print perf.receive_stop()
158
159
160 if __name__ == "__main__":
161     from vstf.common.log import setup_logging
162
163     setup_logging(level=logging.DEBUG, log_file="/var/log/vstf/vstf-qperf.log", clevel=logging.DEBUG)
164     unit_test()