Code improvements
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog 
29
30 class prox_ctrl(object):
31     def __init__(self, ip, key=None, user=None):
32         self._ip   = ip
33         self._key  = key
34         self._user = user
35         self._proxsock = []
36
37     def ip(self):
38         return self._ip
39
40     def test_connect(self):
41         """Simply try to run 'true' over ssh on remote system.
42         On failure, raise RuntimeWarning exception when possibly worth
43         retrying, and raise RuntimeError exception otherwise.
44         """
45         return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
46
47     def connect(self):
48         attempts = 1
49         RapidLog.debug("Trying to connect to machine \
50                 on %s, attempt: %d" % (self._ip, attempts))
51         while True:
52             try:
53                 self.test_connect()
54                 break
55             except RuntimeWarning as ex:
56                 RapidLog.debug("RuntimeWarning %d:\n%s"
57                     % (ex.returncode, ex.output.strip()))
58                 attempts += 1
59                 if attempts > 20:
60                     RapidLog.exception("Failed to connect to instance after %d\
61                             attempts:\n%s" % (attempts, ex))
62                 time.sleep(2)
63                 RapidLog.debug("Trying to connect to machine \
64                        on %s, attempt: %d" % (self._ip, attempts))
65         RapidLog.debug("Connected to machine on %s" % self._ip)
66
67     def connect_socket(self):
68         attempts = 1
69         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
70                 attempt: %d" % (self._ip, attempts))
71         sock = None
72         while True:
73             sock = self.prox_sock()
74             if sock is not None:
75                 break
76             attempts += 1
77             if attempts > 20:
78                 RapidLog.exception("Failed to connect to PROX on %s after %d \
79                         attempts" % (self._ip, attempts))
80             time.sleep(2)
81             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
82                     attempt: %d" % (self._ip, attempts))
83         RapidLog.info("Connected to PROX on %s" % self._ip)
84         return sock
85
86     def close(self):
87         for sock in self._proxsock:
88             sock.quit()
89
90     def run_cmd(self, command, _connect=False):
91         """Execute command over ssh on remote system.
92         Wait for remote command completion.
93         Return command output (combined stdout and stderr).
94         _connect argument is reserved for connect() method.
95         """
96         cmd = self._build_ssh(command)
97         try:
98             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
99         except subprocess.CalledProcessError as ex:
100             RapidLog.exception('ssh returned exit status %d:\n%s'
101                     % (ex.returncode, ex.output.strip()))
102
103     def prox_sock(self, port=8474):
104         """Connect to the PROX instance on remote system.
105         Return a prox_sock object on success, None on failure.
106         """
107         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
108         try:
109             sock.connect((self._ip, port))
110             prox = prox_sock(sock)
111             self._proxsock.append(prox)
112             return prox
113         except:
114             return None
115
116     def scp_put(self, src, dst):
117         """Copy src file from local system to dst on remote system."""
118         cmd = [ 'scp',
119                 '-B',
120                 '-oStrictHostKeyChecking=no',
121                 '-oUserKnownHostsFile=/dev/null',
122                 '-oLogLevel=ERROR' ]
123         if self._key is not None:
124             cmd.extend(['-i', self._key])
125         cmd.append(src)
126         remote = ''
127         if self._user is not None:
128             remote += self._user + '@'
129         remote += self._ip + ':' + dst
130         cmd.append(remote)
131         try:
132             # Actually ignore output on success, but capture stderr on failure
133             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
134         except subprocess.CalledProcessError as ex:
135             RapidLog.exception('scp returned exit status %d:\n%s'
136                     % (ex.returncode, ex.output.strip()))
137
138     def scp_get(self, src, dst):
139         """Copy src file from remote system to dst on local system."""
140         cmd = [ 'scp',
141                 '-B',
142                 '-oStrictHostKeyChecking=no',
143                 '-oUserKnownHostsFile=/dev/null',
144                 '-oLogLevel=ERROR' ]
145         if self._key is not None:
146             cmd.extend(['-i', self._key])
147         remote = ''
148         if self._user is not None:
149             remote += self._user + '@'
150         remote += self._ip + ':/home/' + self._user + src
151         cmd.append(remote)
152         cmd.append(dst)
153         try:
154             # Actually ignore output on success, but capture stderr on failure
155             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
156         except subprocess.CalledProcessError as ex:
157             RapidLog.exception('scp returned exit status %d:\n%s'
158                     % (ex.returncode, ex.output.strip()))
159
160     def _build_ssh(self, command):
161         cmd = [ 'ssh',
162                 '-oBatchMode=yes',
163                 '-oStrictHostKeyChecking=no',
164                 '-oUserKnownHostsFile=/dev/null',
165                 '-oLogLevel=ERROR' ]
166         if self._key is not None:
167             cmd.extend(['-i', self._key])
168         remote = ''
169         if self._user is not None:
170             remote += self._user + '@'
171         remote += self._ip
172         cmd.append(remote)
173         cmd.append(command)
174         return cmd
175
176 class prox_sock(object):
177     def __init__(self, sock):
178         self._sock = sock
179         self._rcvd = b''
180
181     def __del__(self):
182         if self._sock is not None:
183             self._sock.close()
184             self._sock = None
185
186     def start(self, cores):
187         self._send('start %s' % ','.join(map(str, cores)))
188
189     def stop(self, cores):
190         self._send('stop %s' % ','.join(map(str, cores)))
191
192     def speed(self, speed, cores, tasks=[0]):
193         for core in cores:
194             for task in tasks:
195                 self._send('speed %s %s %s' % (core, task, speed))
196
197     def reset_stats(self):
198         self._send('reset stats')
199
200     def lat_stats(self, cores, tasks=[0]):
201         result = {}
202         result['lat_min'] = 999999999
203         result['lat_max'] = result['lat_avg'] = 0
204         number_tasks_returning_stats = 0
205         result['buckets'] = [0] * 128
206         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
207             ','.join(map(str, tasks))))
208         for core in cores:
209             for task in tasks:
210                 stats = self._recv().split(',')
211             if 'is not measuring' in stats[0]:
212                 continue
213             if stats[0].startswith('error'):
214                 RapidLog.critical("lat stats error: unexpected reply from PROX\
215                         (potential incompatibility between scripts and PROX)")
216                 raise Exception("lat stats error")
217             number_tasks_returning_stats += 1
218             result['lat_min'] = min(int(stats[0]),result['lat_min'])
219             result['lat_max'] = max(int(stats[1]),result['lat_max'])
220             result['lat_avg'] += int(stats[2])
221             #min_since begin = int(stats[3])
222             #max_since_begin = int(stats[4])
223             result['lat_tsc'] = int(stats[5]) # Taking the last tsc as the timestamp since
224                                 # PROX will return the same tsc for each 
225                                 # core/task combination 
226             result['lat_hz'] = int(stats[6])
227             #coreid = int(stats[7])
228             #taskid = int(stats[8])
229             mis_ordered = int(stats[9])
230             extent = int(stats[10])
231             duplicate = int(stats[11])
232             stats = self._recv().split(':')
233             if stats[0].startswith('error'):
234                 RapidLog.critical("lat stats error: unexpected lat bucket \
235                         reply (potential incompatibility between scripts \
236                         and PROX)")
237                 raise Exception("lat bucket reply error")
238             result['buckets'][0] = int(stats[1])
239             for i in range(1, 128):
240                 stats = self._recv().split(':')
241                 result['buckets'][i] = int(stats[1])
242         result['lat_avg'] = old_div(result['lat_avg'],
243                 number_tasks_returning_stats)
244         self._send('stats latency(0).used')
245         used = float(self._recv())
246         self._send('stats latency(0).total')
247         total = float(self._recv())
248         result['lat_used'] = old_div(used,total)
249         return (result)
250
251     def irq_stats(self, core, bucket, task=0):
252         self._send('stats task.core(%s).task(%s).irq(%s)' % 
253                 (core, task, bucket))
254         stats = self._recv().split(',')
255         return int(stats[0])
256
257     def show_irq_buckets(self, core, task=0):
258         rx = tx = drop = tsc = hz = 0
259         self._send('show irq buckets %s %s' % (core,task))
260         buckets = self._recv().split(';')
261         buckets = buckets[:-1]
262         return buckets
263
264     def core_stats(self, cores, tasks=[0]):
265         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
266         self._send('dp core stats %s %s' % (','.join(map(str, cores)), 
267             ','.join(map(str, tasks))))
268         for core in cores:
269             for task in tasks:
270                 stats = self._recv().split(',')
271                 if stats[0].startswith('error'):  
272                     if stats[0].startswith('error: invalid syntax'):
273                         RapidLog.critical("dp core stats error: unexpected \
274                                 invalid syntax (potential incompatibility \
275                                 between scripts and PROX)")
276                         raise Exception("dp core stats error")
277                     continue
278                 rx += int(stats[0])
279                 tx += int(stats[1])
280                 rx_non_dp += int(stats[2])
281                 tx_non_dp += int(stats[3])
282                 drop += int(stats[4])
283                 tx_fail += int(stats[5])
284                 tsc = int(stats[6])
285                 hz = int(stats[7])
286         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
287
288     def multi_port_stats(self, ports=[0]):
289         rx = tx = port_id = tsc = no_mbufs = errors = 0
290         self._send('multi port stats %s' % (','.join(map(str, ports))))
291         result = self._recv().split(';')
292         if result[0].startswith('error'):  
293             RapidLog.critical("multi port stats error: unexpected invalid \
294                     syntax (potential incompatibility between scripts and \
295                     PROX)")
296             raise Exception("multi port stats error")
297         for statistics in result:
298             stats = statistics.split(',')
299             port_id = int(stats[0])
300             rx += int(stats[1])
301             tx += int(stats[2])
302             no_mbufs += int(stats[3])
303             errors += int(stats[4])
304             tsc = int(stats[5])
305         return rx, tx, no_mbufs, errors, tsc
306
307     def set_random(self, cores, task, offset, mask, length):
308         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), 
309             task, offset, mask, length))
310
311     def set_size(self, cores, task, pkt_size):
312         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, 
313             pkt_size))
314
315     def set_imix(self, cores, task, imix):
316         self._send('imix %s %s %s' % (','.join(map(str, cores)), task, 
317             ','.join(map(str,imix))))
318
319     def set_value(self, cores, task, offset, value, length):
320         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), 
321             task, offset, value, length))
322
323     def quit_prox(self):
324         self._send('quit')
325
326     def _send(self, cmd):
327         """Append LF and send command to the PROX instance."""
328         if self._sock is None:
329             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
330         self._sock.sendall(cmd.encode() + b'\n')
331
332     def _recv(self):
333         """Receive response from PROX instance, return it with LF removed."""
334         if self._sock is None:
335             raise RuntimeError("PROX socket closed, cannot receive anymore")
336         pos = self._rcvd.find(b'\n')
337         while pos == -1:
338             self._rcvd += self._sock.recv(256)
339             pos = self._rcvd.find(b'\n')
340         rsp = self._rcvd[:pos]
341         self._rcvd = self._rcvd[pos+1:]
342         return rsp.decode()