f84f9c2402950f47bc09f041fb16a0daa5c22a79
[bottlenecks.git] / vstf / vstf / agent / perf / qperf.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import subprocess
11 import time
12 import logging
13 import vstf.common.decorator as deco
14 from vstf.common import perfmark as mark
15 from vstf.common.utils import kill_by_name, my_popen
16
17 LOG = logging.getLogger(__name__)
18
19
20 class Qperf(object):
21     def __init__(self):
22         self._send_processes = []
23         self._receive_processes = []
24
25     @deco.check("protocol", choices=['tcp_lat', 'udp_lat'])
26     @deco.check("namespace", defaults=None)
27     @deco.check("dst")
28     @deco.check("time", defaults=10)
29     @deco.check("size", defaults=64)
30     def send_start(self, **kwargs):
31         cmd = self.format_send_start(**kwargs)
32         LOG.info("cmd:%s", cmd)
33         process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
34         time.sleep(0.5)
35         ret = process.poll()
36         if ret is None:
37             ret = 0
38             error_str = "start qperf send success"
39             self._send_processes.append(process)
40         else:
41             print ret
42             error_str = "start qperf send failed, %s" % (str(kwargs))
43             process.wait()
44
45         return ret, error_str
46
47     @deco.namespace()
48     def format_send_start(self, **kwargs):
49         cmd = "qperf %(dst_ip)s -t %(time)s -m %(pkt_size)s -vu %(type)s "
50         context = {
51             'dst_ip': kwargs['dst'][0]['ip'],
52             'type': kwargs['protocol'],
53             'time': kwargs['time'],
54             'pkt_size': kwargs['size'],
55         }
56         cmd = cmd % context
57         return cmd
58
59     def send_stop(self, **kwargs):
60         results = []
61         for process in self._send_processes:
62             process.wait()
63             read = process.stdout.read()
64             read = self._parse_data(read)
65             ret = 0
66             results.append((ret, read))
67         self._send_processes = []
68         return results
69
70     @deco.namespace()
71     def format_receive_start(self, **kwargs):
72         cmd = 'qperf'
73         return cmd
74
75     def receive_start(self, **kwargs):
76         cmd = self.format_receive_start(**kwargs)
77         LOG.info("cmd:%s", cmd)
78
79         process = my_popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
80         time.sleep(0.5)
81         ret = process.poll()
82         if ret is None:
83             ret = 0
84             error_str = "start qperf receive success"
85             self._receive_processes.append(process)
86         else:
87             print ret
88             error_str = "start qperf receive failed, %s" % (str(kwargs))
89             process.wait()
90             raise Exception(error_str)
91         return ret, error_str
92
93     def receive_stop(self, **kwargs):
94         ret = 0
95         for process in self._receive_processes:
96             process.kill()
97             process.wait()
98         self._receive_processes = []
99         error_str = "stop qperf receive success"
100         return ret, error_str
101
102     def receive_kill(self):
103         kill_by_name('qperf')
104         self._receive_processes = []
105         return True
106
107     def clean(self):
108         for process in self._receive_processes:
109             process.kill()
110             process.wait()
111             LOG.info("process.kill(qperf daemon:%s)", process.pid)
112         for process in self._send_processes:
113             LOG.info("process.wait(qperf client:%s)", process.pid)
114             process.wait()
115         self._receive_processes = []
116         self._send_processes = []
117         return True
118
119     def force_clean(self):
120         LOG.info("%s %s start", self.__class__, self.force_clean.__name__)
121         kill_by_name('qperf')
122         self._send_processes = []
123         self._receive_processes = []
124         return True
125
126     def _parse_data(self, data):
127         LOG.info(data)
128         latency = 0
129         if data:
130             buf = data.splitlines()
131             if "latency" in buf[1]:
132                 data = buf[1].strip().split()
133                 if data[3] == "us":
134                     latency = float(data[2]) / 1000
135                 else:
136                     latency = float(data[2]) / 1000
137         result = {
138             mark.minLatency: latency,
139             mark.avgLatency: latency,
140             mark.maxLatency: latency
141         }
142         return result
143
144
145 def unit_test():
146     perf = Qperf()
147     perf.receive_start(namespace='receive')
148
149     send = {
150         "namespace": "send",
151         "protocol": "udp_lat",
152         "dst": [
153             {"ip": "192.168.1.102"}
154         ],
155         "size": 64,
156     }
157     print perf.send_start(**send)
158     time.sleep(10)
159     print perf.send_stop()
160     print perf.receive_stop()
161
162
163 if __name__ == "__main__":
164     from vstf.common.log import setup_logging
165
166     setup_logging(level=logging.DEBUG, log_file="/var/log/vstf/vstf-qperf.log", clevel=logging.DEBUG)
167     unit_test()