Specify latency bucket size & correct calculation
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2019 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18
19 import os
20 import subprocess
21 import socket
22
23 class prox_ctrl(object):
24     def __init__(self, ip, key=None, user=None):
25         self._ip   = ip
26         self._key  = key
27         self._user = user
28         self._children = []
29         self._proxsock = []
30
31     def ip(self):
32         return self._ip
33
34     def connect(self):
35         """Simply try to run 'true' over ssh on remote system.
36         On failure, raise RuntimeWarning exception when possibly worth
37         retrying, and raise RuntimeError exception otherwise.
38         """
39         return self.run_cmd('true', True)
40
41     def close(self):
42         """Must be called before program termination."""
43 #        for prox in self._proxsock:
44 #            prox.quit()
45         children = len(self._children)
46         if children == 0:
47             return
48         if children > 1:
49             print('Waiting for %d child processes to complete ...' % children)
50         for child in self._children:
51             ret = os.waitpid(child[0], os.WNOHANG)
52             if ret[0] == 0:
53                 print("Waiting for child process '%s' to complete ..." % child[1])
54                 ret = os.waitpid(child[0], 0)
55             rc = ret[1]
56             if os.WIFEXITED(rc):
57                 if os.WEXITSTATUS(rc) == 0:
58                     print("Child process '%s' completed successfully" % child[1])
59                 else:
60                     print("Child process '%s' returned exit status %d" % (
61                             child[1], os.WEXITSTATUS(rc)))
62             elif os.WIFSIGNALED(rc):
63                 print("Child process '%s' exited on signal %d" % (
64                         child[1], os.WTERMSIG(rc)))
65             else:
66                 print("Wait status for child process '%s' is 0x%04x" % (
67                         child[1], rc))
68
69     def run_cmd(self, command, _connect=False):
70         """Execute command over ssh on remote system.
71         Wait for remote command completion.
72         Return command output (combined stdout and stderr).
73         _connect argument is reserved for connect() method.
74         """
75         cmd = self._build_ssh(command)
76         try:
77             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
78         except subprocess.CalledProcessError as ex:
79             if _connect and ex.returncode == 255:
80                 raise RuntimeWarning(ex.output.strip())
81             raise RuntimeError('ssh returned exit status %d:\n%s'
82                     % (ex.returncode, ex.output.strip()))
83
84     def fork_cmd(self, command, name=None):
85         """Execute command over ssh on remote system, in a child process.
86         Do not wait for remote command completion.
87         Return child process id.
88         """
89         if name is None:
90             name = command
91         cmd = self._build_ssh(command)
92         pid = os.fork()
93         if (pid != 0):
94             # In the parent process
95             self._children.append((pid, name))
96             return pid
97         # In the child process: use os._exit to terminate
98         try:
99             # Actually ignore output on success, but capture stderr on failure
100             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
101         except subprocess.CalledProcessError as ex:
102             raise RuntimeError("Child process '%s' failed:\n"
103                     'ssh returned exit status %d:\n%s'
104                     % (name, ex.returncode, ex.output.strip()))
105         os._exit(0)
106
107     def prox_sock(self, port=8474):
108         """Connect to the PROX instance on remote system.
109         Return a prox_sock object on success, None on failure.
110         """
111         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
112         try:
113             sock.connect((self._ip, port))
114             prox = prox_sock(sock)
115             self._proxsock.append(prox)
116             return prox
117         except:
118             return None
119
120     def scp_put(self, src, dst):
121         """Copy src file from local system to dst on remote system."""
122         cmd = [ 'scp',
123                 '-B',
124                 '-oStrictHostKeyChecking=no',
125                 '-oUserKnownHostsFile=/dev/null',
126                 '-oLogLevel=ERROR' ]
127         if self._key is not None:
128             cmd.extend(['-i', self._key])
129         cmd.append(src)
130         remote = ''
131         if self._user is not None:
132             remote += self._user + '@'
133         remote += self._ip + ':' + dst
134         cmd.append(remote)
135         try:
136             # Actually ignore output on success, but capture stderr on failure
137             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
138         except subprocess.CalledProcessError as ex:
139             raise RuntimeError('scp returned exit status %d:\n%s'
140                     % (ex.returncode, ex.output.strip()))
141
142     def _build_ssh(self, command):
143         cmd = [ 'ssh',
144                 '-oBatchMode=yes',
145                 '-oStrictHostKeyChecking=no',
146                 '-oUserKnownHostsFile=/dev/null',
147                 '-oLogLevel=ERROR' ]
148         if self._key is not None:
149             cmd.extend(['-i', self._key])
150         remote = ''
151         if self._user is not None:
152             remote += self._user + '@'
153         remote += self._ip
154         cmd.append(remote)
155         cmd.append(command)
156         return cmd
157
158 class prox_sock(object):
159     def __init__(self, sock):
160         self._sock = sock
161         self._rcvd = b''
162
163     def quit(self):
164         if self._sock is not None:
165             self._send('quit')
166             self._sock.close()
167             self._sock = None
168
169     def start(self, cores):
170         self._send('start %s' % ','.join(map(str, cores)))
171
172     def stop(self, cores):
173         self._send('stop %s' % ','.join(map(str, cores)))
174
175     def speed(self, speed, cores, tasks=[0]):
176         for core in cores:
177                 for task in tasks:
178                         self._send('speed %s %s %s' % (core, task, speed))
179
180     def reset_stats(self):
181         self._send('reset stats')
182
183     def lat_stats(self, cores, tasks=[0]):
184         min_lat = 999999999
185         max_lat = avg_lat = 0
186         number_tasks_returning_stats = 0
187         buckets = [0] * 128
188         self._send('lat all stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
189         for core in cores:
190                 for task in tasks:
191                         stats = self._recv().split(',')
192                         if 'is not measuring' in stats[0]:
193                                 continue
194                         if stats[0].startswith('error'):
195                                 log.critical("lat stats error: unexpected reply from PROX (potential incompatibility between scripts and PROX)")
196                                 raise Exception("lat stats error")
197                         number_tasks_returning_stats += 1
198                         min_lat = min(int(stats[0]),min_lat)
199                         max_lat = max(int(stats[1]),max_lat)
200                         avg_lat += int(stats[2])
201                         #min_since begin = int(stats[3])
202                         #max_since_begin = int(stats[4])
203                         tsc = int(stats[5]) # Taking the last tsc as the timestamp since PROX will return the same tsc for each core/task combination 
204                         hz = int(stats[6])
205                         #coreid = int(stats[7])
206                         #taskid = int(stats[8])
207                         stats = self._recv().split(':')
208                         if stats[0].startswith('error'):
209                                 log.critical("lat stats error: unexpected lat bucket reply (potential incompatibility between scripts and PROX)")
210                                 raise Exception("lat bucket reply error")
211                         buckets[0] = int(stats[1])
212                         for i in range(1, 128):
213                                 stats = self._recv().split(':')
214                                 buckets[i] = int(stats[1])
215         avg_lat = avg_lat/number_tasks_returning_stats
216         self._send('stats latency(0).used')
217         used = float(self._recv())
218         self._send('stats latency(0).total')
219         total = float(self._recv())
220         return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets
221
222     def old_lat_stats(self, cores, tasks=[0]):
223         min_lat = 999999999
224         max_lat = avg_lat = 0
225         number_tasks_returning_stats = 0
226         buckets = [0] * 128
227         self._send('lat stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
228         for core in cores:
229                 for task in tasks:
230                         stats = self._recv().split(',')
231                         if stats[0].startswith('error'):
232                                 if stats[0].startswith('error: invalid syntax'):
233                                         log.critical("lat stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
234                                         raise Exception("lat stats error")
235                                 continue
236                         number_tasks_returning_stats += 1
237                         min_lat = min(int(stats[0]),min_lat)
238                         max_lat = max(int(stats[1]),max_lat)
239                         avg_lat += int(stats[2])
240                         #min_since begin = int(stats[3])
241                         #max_since_begin = int(stats[4])
242                         tsc = int(stats[5])
243                         hz = int(stats[6])
244                         #coreid = int(stats[7])
245                         #taskid = int(stats[8])
246         avg_lat = avg_lat/number_tasks_returning_stats
247         self._send('stats latency(0).used')
248         used = float(self._recv())
249         self._send('stats latency(0).total')
250         total = float(self._recv())
251         return min_lat, max_lat, avg_lat, (used/total), tsc, hz, buckets
252
253     def irq_stats(self, core, bucket, task=0):
254         self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket))
255         stats = self._recv().split(',')
256         return int(stats[0])
257
258     def show_irq_buckets(self, core, task=0):
259         rx = tx = drop = tsc = hz = 0
260         self._send('show irq buckets %s %s' % (core,task))
261         buckets = self._recv().split(';')
262         buckets = buckets[:-1]
263         return buckets
264
265     def core_stats(self, cores, tasks=[0]):
266         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
267         self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
268         for core in cores:
269                 for task in tasks:
270                         stats = self._recv().split(',')
271                         if stats[0].startswith('error'):  
272                                 if stats[0].startswith('error: invalid syntax'):
273                                         log.critical("dp core stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
274                                         raise Exception("dp core stats error")
275                                 continue
276                         rx += int(stats[0])
277                         tx += int(stats[1])
278                         rx_non_dp += int(stats[2])
279                         tx_non_dp += int(stats[3])
280                         drop += int(stats[4])
281                         tx_fail += int(stats[5])
282                         tsc = int(stats[6])
283                         hz = int(stats[7])
284         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
285
286     def multi_port_stats(self, ports=[0]):
287         rx = tx = port_id = tsc = no_mbufs = errors = 0
288         self._send('multi port stats %s' % (','.join(map(str, ports))))
289         result = self._recv().split(';')
290         if result[0].startswith('error'):  
291                 log.critical("multi port stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
292                 raise Exception("multi port stats error")
293         for statistics in result:
294                 stats = statistics.split(',')
295                 port_id = int(stats[0])
296                 rx += int(stats[1])
297                 tx += int(stats[2])
298                 no_mbufs += int(stats[3])
299                 errors += int(stats[4])
300                 tsc = int(stats[5])
301         return rx, tx, no_mbufs, errors, tsc
302
303     def set_random(self, cores, task, offset, mask, length):
304         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length))
305
306     def set_size(self, cores, task, pkt_size):
307         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size))
308
309     def set_value(self, cores, task, offset, value, length):
310         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length))
311
312     def _send(self, cmd):
313         """Append LF and send command to the PROX instance."""
314         if self._sock is None:
315             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
316         self._sock.sendall(cmd.encode() + b'\n')
317
318     def _recv(self):
319         """Receive response from PROX instance, and return it with LF removed."""
320         if self._sock is None:
321             raise RuntimeError("PROX socket closed, cannot receive anymore")
322         pos = self._rcvd.find(b'\n')
323         while pos == -1:
324             self._rcvd += self._sock.recv(256)
325             pos = self._rcvd.find(b'\n')
326         rsp = self._rcvd[:pos]
327         self._rcvd = self._rcvd[pos+1:]
328         return rsp.decode()
329