Fix latency buckets with multiple lat cores
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog
29
30 class prox_ctrl(object):
31     def __init__(self, ip, key=None, user=None):
32         self._ip   = ip
33         self._key  = key
34         self._user = user
35         self._proxsock = []
36
37     def ip(self):
38         return self._ip
39
40     def test_connect(self):
41         """Simply try to run 'true' over ssh on remote system.
42         On failure, raise RuntimeWarning exception when possibly worth
43         retrying, and raise RuntimeError exception otherwise.
44         """
45         return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
46
47     def connect(self):
48         attempts = 1
49         RapidLog.debug("Trying to connect to machine \
50                 on %s, attempt: %d" % (self._ip, attempts))
51         while True:
52             try:
53                 self.test_connect()
54                 break
55             except RuntimeWarning as ex:
56                 RapidLog.debug("RuntimeWarning %d:\n%s"
57                     % (ex.returncode, ex.output.strip()))
58                 attempts += 1
59                 if attempts > 20:
60                     RapidLog.exception("Failed to connect to instance after %d\
61                             attempts:\n%s" % (attempts, ex))
62                 time.sleep(2)
63                 RapidLog.debug("Trying to connect to machine \
64                        on %s, attempt: %d" % (self._ip, attempts))
65         RapidLog.debug("Connected to machine on %s" % self._ip)
66
67     def connect_socket(self):
68         attempts = 1
69         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
70                 attempt: %d" % (self._ip, attempts))
71         sock = None
72         while True:
73             sock = self.prox_sock()
74             if sock is not None:
75                 break
76             attempts += 1
77             if attempts > 20:
78                 RapidLog.exception("Failed to connect to PROX on %s after %d \
79                         attempts" % (self._ip, attempts))
80             time.sleep(2)
81             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
82                     attempt: %d" % (self._ip, attempts))
83         RapidLog.info("Connected to PROX on %s" % self._ip)
84         return sock
85
86     def close(self):
87         for sock in self._proxsock:
88             sock.quit()
89
90     def run_cmd(self, command, _connect=False):
91         """Execute command over ssh on remote system.
92         Wait for remote command completion.
93         Return command output (combined stdout and stderr).
94         _connect argument is reserved for connect() method.
95         """
96         cmd = self._build_ssh(command)
97         try:
98             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
99         except subprocess.CalledProcessError as ex:
100             RapidLog.exception('ssh returned exit status %d:\n%s'
101                     % (ex.returncode, ex.output.strip()))
102
103     def prox_sock(self, port=8474):
104         """Connect to the PROX instance on remote system.
105         Return a prox_sock object on success, None on failure.
106         """
107         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
108         try:
109             sock.connect((self._ip, port))
110             prox = prox_sock(sock)
111             self._proxsock.append(prox)
112             return prox
113         except:
114             return None
115
116     def scp_put(self, src, dst):
117         """Copy src file from local system to dst on remote system."""
118         cmd = [ 'scp',
119                 '-B',
120                 '-oStrictHostKeyChecking=no',
121                 '-oUserKnownHostsFile=/dev/null',
122                 '-oLogLevel=ERROR' ]
123         if self._key is not None:
124             cmd.extend(['-i', self._key])
125         cmd.append(src)
126         remote = ''
127         if self._user is not None:
128             remote += self._user + '@'
129         remote += self._ip + ':' + dst
130         cmd.append(remote)
131         try:
132             # Actually ignore output on success, but capture stderr on failure
133             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
134         except subprocess.CalledProcessError as ex:
135             RapidLog.exception('scp returned exit status %d:\n%s'
136                     % (ex.returncode, ex.output.strip()))
137
138     def scp_get(self, src, dst):
139         """Copy src file from remote system to dst on local system."""
140         cmd = [ 'scp',
141                 '-B',
142                 '-oStrictHostKeyChecking=no',
143                 '-oUserKnownHostsFile=/dev/null',
144                 '-oLogLevel=ERROR' ]
145         if self._key is not None:
146             cmd.extend(['-i', self._key])
147         remote = ''
148         if self._user is not None:
149             remote += self._user + '@'
150         remote += self._ip + ':/home/' + self._user + src
151         cmd.append(remote)
152         cmd.append(dst)
153         try:
154             # Actually ignore output on success, but capture stderr on failure
155             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
156         except subprocess.CalledProcessError as ex:
157             RapidLog.exception('scp returned exit status %d:\n%s'
158                     % (ex.returncode, ex.output.strip()))
159
160     def _build_ssh(self, command):
161         cmd = [ 'ssh',
162                 '-oBatchMode=yes',
163                 '-oStrictHostKeyChecking=no',
164                 '-oUserKnownHostsFile=/dev/null',
165                 '-oLogLevel=ERROR' ]
166         if self._key is not None:
167             cmd.extend(['-i', self._key])
168         remote = ''
169         if self._user is not None:
170             remote += self._user + '@'
171         remote += self._ip
172         cmd.append(remote)
173         cmd.append(command)
174         return cmd
175
176 class prox_sock(object):
177     def __init__(self, sock):
178         self._sock = sock
179         self._rcvd = b''
180
181     def __del__(self):
182         if self._sock is not None:
183             self._sock.close()
184             self._sock = None
185
186     def start(self, cores):
187         self._send('start %s' % ','.join(map(str, cores)))
188
189     def stop(self, cores):
190         self._send('stop %s' % ','.join(map(str, cores)))
191
192     def speed(self, speed, cores, tasks=[0]):
193         for core in cores:
194             for task in tasks:
195                 self._send('speed %s %s %s' % (core, task, speed))
196
197     def reset_stats(self):
198         self._send('reset stats')
199
200     def lat_stats(self, cores, tasks=[0]):
201         result = {}
202         result['lat_min'] = 999999999
203         result['lat_max'] = result['lat_avg'] = 0
204         result['buckets'] = [0] * 128
205         result['mis_ordered'] = 0
206         result['extent'] = 0
207         result['duplicate'] = 0
208         number_tasks_returning_stats = 0
209         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
210             ','.join(map(str, tasks))))
211         for core in cores:
212             for task in tasks:
213                 stats = self._recv().split(',')
214             if 'is not measuring' in stats[0]:
215                 continue
216             if stats[0].startswith('error'):
217                 RapidLog.critical("lat stats error: unexpected reply from PROX\
218                         (potential incompatibility between scripts and PROX)")
219                 raise Exception("lat stats error")
220             number_tasks_returning_stats += 1
221             result['lat_min'] = min(int(stats[0]),result['lat_min'])
222             result['lat_max'] = max(int(stats[1]),result['lat_max'])
223             result['lat_avg'] += int(stats[2])
224             #min_since begin = int(stats[3])
225             #max_since_begin = int(stats[4])
226             result['lat_tsc'] = int(stats[5])
227             # Taking the last tsc as the timestamp since
228             # PROX will return the same tsc for each
229             # core/task combination
230             result['lat_hz'] = int(stats[6])
231             #coreid = int(stats[7])
232             #taskid = int(stats[8])
233             result['mis_ordered'] += int(stats[9])
234             result['extent'] += int(stats[10])
235             result['duplicate'] += int(stats[11])
236             stats = self._recv().split(':')
237             if stats[0].startswith('error'):
238                 RapidLog.critical("lat stats error: unexpected lat bucket \
239                         reply (potential incompatibility between scripts \
240                         and PROX)")
241                 raise Exception("lat bucket reply error")
242             result['buckets'][0] = int(stats[1])
243             for i in range(1, 128):
244                 stats = self._recv().split(':')
245                 result['buckets'][i] += int(stats[1])
246         result['lat_avg'] = old_div(result['lat_avg'],
247                 number_tasks_returning_stats)
248         self._send('stats latency(0).used')
249         used = float(self._recv())
250         self._send('stats latency(0).total')
251         total = float(self._recv())
252         result['lat_used'] = old_div(used,total)
253         return (result)
254
255     def irq_stats(self, core, bucket, task=0):
256         self._send('stats task.core(%s).task(%s).irq(%s)' %
257                 (core, task, bucket))
258         stats = self._recv().split(',')
259         return int(stats[0])
260
261     def show_irq_buckets(self, core, task=0):
262         rx = tx = drop = tsc = hz = 0
263         self._send('show irq buckets %s %s' % (core,task))
264         buckets = self._recv().split(';')
265         buckets = buckets[:-1]
266         return buckets
267
268     def core_stats(self, cores, tasks=[0]):
269         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
270         self._send('dp core stats %s %s' % (','.join(map(str, cores)),
271             ','.join(map(str, tasks))))
272         for core in cores:
273             for task in tasks:
274                 stats = self._recv().split(',')
275                 if stats[0].startswith('error'):
276                     if stats[0].startswith('error: invalid syntax'):
277                         RapidLog.critical("dp core stats error: unexpected \
278                                 invalid syntax (potential incompatibility \
279                                 between scripts and PROX)")
280                         raise Exception("dp core stats error")
281                     continue
282                 rx += int(stats[0])
283                 tx += int(stats[1])
284                 rx_non_dp += int(stats[2])
285                 tx_non_dp += int(stats[3])
286                 drop += int(stats[4])
287                 tx_fail += int(stats[5])
288                 tsc = int(stats[6])
289                 hz = int(stats[7])
290         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
291
292     def multi_port_stats(self, ports=[0]):
293         rx = tx = port_id = tsc = no_mbufs = errors = 0
294         self._send('multi port stats %s' % (','.join(map(str, ports))))
295         result = self._recv().split(';')
296         if result[0].startswith('error'):
297             RapidLog.critical("multi port stats error: unexpected invalid \
298                     syntax (potential incompatibility between scripts and \
299                     PROX)")
300             raise Exception("multi port stats error")
301         for statistics in result:
302             stats = statistics.split(',')
303             port_id = int(stats[0])
304             rx += int(stats[1])
305             tx += int(stats[2])
306             no_mbufs += int(stats[3])
307             errors += int(stats[4])
308             tsc = int(stats[5])
309         return rx, tx, no_mbufs, errors, tsc
310
311     def set_random(self, cores, task, offset, mask, length):
312         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
313             task, offset, mask, length))
314
315     def set_size(self, cores, task, pkt_size):
316         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
317             pkt_size))
318
319     def set_imix(self, cores, task, imix):
320         self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
321             ','.join(map(str,imix))))
322
323     def set_value(self, cores, task, offset, value, length):
324         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
325             task, offset, value, length))
326
327     def quit_prox(self):
328         self._send('quit')
329
330     def _send(self, cmd):
331         """Append LF and send command to the PROX instance."""
332         if self._sock is None:
333             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
334         try:
335             self._sock.sendall(cmd.encode() + b'\n')
336         except ConnectionResetError as e:
337             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
338             raise
339
340     def _recv(self):
341         """Receive response from PROX instance, return it with LF removed."""
342         if self._sock is None:
343             raise RuntimeError("PROX socket closed, cannot receive anymore")
344         try:
345             pos = self._rcvd.find(b'\n')
346             while pos == -1:
347                 self._rcvd += self._sock.recv(256)
348                 pos = self._rcvd.find(b'\n')
349             rsp = self._rcvd[:pos]
350             self._rcvd = self._rcvd[pos+1:]
351         except ConnectionResetError as e:
352             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
353             raise
354         return rsp.decode()