Using python concurrent futures
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog 
29
30 class prox_ctrl(object):
31     def __init__(self, ip, key=None, user=None):
32         self._ip   = ip
33         self._key  = key
34         self._user = user
35         self._proxsock = []
36
37     def __del__(self):
38         self.close()
39
40     def ip(self):
41         return self._ip
42
43     def test_connect(self):
44         """Simply try to run 'true' over ssh on remote system.
45         On failure, raise RuntimeWarning exception when possibly worth
46         retrying, and raise RuntimeError exception otherwise.
47         """
48         return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
49
50     def connect(self):
51         attempts = 1
52         RapidLog.debug("Trying to connect to machine \
53                 on %s, attempt: %d" % (self._ip, attempts))
54         while True:
55             try:
56                 self.test_connect()
57                 break
58             except RuntimeWarning as ex:
59                 attempts += 1
60                 if attempts > 20:
61                     RapidLog.exception("Failed to connect to instance after %d\
62                             attempts:\n%s" % (attempts, ex))
63                     raise Exception("Failed to connect to instance after %d \
64                             attempts:\n%s" % (attempts, ex))
65                 time.sleep(2)
66                 RapidLog.debug("Trying to connect to machine \
67                        on %s, attempt: %d" % (self._ip, attempts))
68         RapidLog.debug("Connected to machine on %s" % self._ip)
69
70     def connect_socket(self):
71         attempts = 1
72         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
73                 attempt: %d" % (self._ip, attempts))
74         sock = None
75         while True:
76             sock = self.prox_sock()
77             if sock is not None:
78                 break
79             attempts += 1
80             if attempts > 20:
81                 RapidLog.exception("Failed to connect to PROX on %s after %d \
82                         attempts" % (self._ip, attempts))
83                 raise Exception("Failed to connect to PROX on %s after %d \
84                         attempts" % (self._ip, attempts))
85             time.sleep(2)
86             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
87                     attempt: %d" % (self._ip, attempts))
88         RapidLog.info("Connected to PROX on %s" % self._ip)
89         return sock
90
91     def close(self):
92         """Must be called before program termination."""
93         for sock in self._proxsock:
94             sock.quit()
95
96     def run_cmd(self, command, _connect=False):
97         """Execute command over ssh on remote system.
98         Wait for remote command completion.
99         Return command output (combined stdout and stderr).
100         _connect argument is reserved for connect() method.
101         """
102         cmd = self._build_ssh(command)
103         try:
104             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
105         except subprocess.CalledProcessError as ex:
106             #if _connect and ex.returncode == 255:
107             if _connect:
108                 raise RuntimeWarning(ex.output.strip())
109             raise RuntimeError('ssh returned exit status %d:\n%s'
110                     % (ex.returncode, ex.output.strip()))
111
112     def prox_sock(self, port=8474):
113         """Connect to the PROX instance on remote system.
114         Return a prox_sock object on success, None on failure.
115         """
116         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
117         try:
118             sock.connect((self._ip, port))
119             prox = prox_sock(sock)
120             self._proxsock.append(prox)
121             return prox
122         except:
123             return None
124
125     def scp_put(self, src, dst):
126         """Copy src file from local system to dst on remote system."""
127         cmd = [ 'scp',
128                 '-B',
129                 '-oStrictHostKeyChecking=no',
130                 '-oUserKnownHostsFile=/dev/null',
131                 '-oLogLevel=ERROR' ]
132         if self._key is not None:
133             cmd.extend(['-i', self._key])
134         cmd.append(src)
135         remote = ''
136         if self._user is not None:
137             remote += self._user + '@'
138         remote += self._ip + ':' + dst
139         cmd.append(remote)
140         try:
141             # Actually ignore output on success, but capture stderr on failure
142             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
143         except subprocess.CalledProcessError as ex:
144             raise RuntimeError('scp returned exit status %d:\n%s'
145                     % (ex.returncode, ex.output.strip()))
146
147     def scp_get(self, src, dst):
148         """Copy src file from remote system to dst on local system."""
149         cmd = [ 'scp',
150                 '-B',
151                 '-oStrictHostKeyChecking=no',
152                 '-oUserKnownHostsFile=/dev/null',
153                 '-oLogLevel=ERROR' ]
154         if self._key is not None:
155             cmd.extend(['-i', self._key])
156         remote = ''
157         if self._user is not None:
158             remote += self._user + '@'
159         remote += self._ip + ':/home/' + self._user + src
160         cmd.append(remote)
161         cmd.append(dst)
162         try:
163             # Actually ignore output on success, but capture stderr on failure
164             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
165         except subprocess.CalledProcessError as ex:
166             raise RuntimeError('scp returned exit status %d:\n%s'
167                     % (ex.returncode, ex.output.strip()))
168
169     def _build_ssh(self, command):
170         cmd = [ 'ssh',
171                 '-oBatchMode=yes',
172                 '-oStrictHostKeyChecking=no',
173                 '-oUserKnownHostsFile=/dev/null',
174                 '-oLogLevel=ERROR' ]
175         if self._key is not None:
176             cmd.extend(['-i', self._key])
177         remote = ''
178         if self._user is not None:
179             remote += self._user + '@'
180         remote += self._ip
181         cmd.append(remote)
182         cmd.append(command)
183         return cmd
184
185 class prox_sock(object):
186     def __init__(self, sock):
187         self._sock = sock
188         self._rcvd = b''
189
190     def __del__(self):
191         self.quit()
192
193     def quit(self):
194         if self._sock is not None:
195             self._send('quit')
196             self._sock.close()
197             self._sock = None
198
199     def start(self, cores):
200         self._send('start %s' % ','.join(map(str, cores)))
201
202     def stop(self, cores):
203         self._send('stop %s' % ','.join(map(str, cores)))
204
205     def speed(self, speed, cores, tasks=[0]):
206         for core in cores:
207             for task in tasks:
208                 self._send('speed %s %s %s' % (core, task, speed))
209
210     def reset_stats(self):
211         self._send('reset stats')
212
213     def lat_stats(self, cores, tasks=[0]):
214         min_lat = 999999999
215         max_lat = avg_lat = 0
216         number_tasks_returning_stats = 0
217         buckets = [0] * 128
218         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
219             ','.join(map(str, tasks))))
220         for core in cores:
221             for task in tasks:
222                 stats = self._recv().split(',')
223             if 'is not measuring' in stats[0]:
224                 continue
225             if stats[0].startswith('error'):
226                 RapidLog.critical("lat stats error: unexpected reply from PROX\
227                         (potential incompatibility between scripts and PROX)")
228                 raise Exception("lat stats error")
229             number_tasks_returning_stats += 1
230             min_lat = min(int(stats[0]),min_lat)
231             max_lat = max(int(stats[1]),max_lat)
232             avg_lat += int(stats[2])
233             #min_since begin = int(stats[3])
234             #max_since_begin = int(stats[4])
235             tsc = int(stats[5]) # Taking the last tsc as the timestamp since
236                                 # PROX will return the same tsc for each 
237                                 # core/task combination 
238             hz = int(stats[6])
239             #coreid = int(stats[7])
240             #taskid = int(stats[8])
241             stats = self._recv().split(':')
242             if stats[0].startswith('error'):
243                 RapidLog.critical("lat stats error: unexpected lat bucket \
244                         reply (potential incompatibility between scripts \
245                         and PROX)")
246                 raise Exception("lat bucket reply error")
247             buckets[0] = int(stats[1])
248             for i in range(1, 128):
249                 stats = self._recv().split(':')
250                 buckets[i] = int(stats[1])
251         avg_lat = old_div(avg_lat,number_tasks_returning_stats)
252         self._send('stats latency(0).used')
253         used = float(self._recv())
254         self._send('stats latency(0).total')
255         total = float(self._recv())
256         return (min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz,
257                 buckets)
258
259     def irq_stats(self, core, bucket, task=0):
260         self._send('stats task.core(%s).task(%s).irq(%s)' % 
261                 (core, task, bucket))
262         stats = self._recv().split(',')
263         return int(stats[0])
264
265     def show_irq_buckets(self, core, task=0):
266         rx = tx = drop = tsc = hz = 0
267         self._send('show irq buckets %s %s' % (core,task))
268         buckets = self._recv().split(';')
269         buckets = buckets[:-1]
270         return buckets
271
272     def core_stats(self, cores, tasks=[0]):
273         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
274         self._send('dp core stats %s %s' % (','.join(map(str, cores)), 
275             ','.join(map(str, tasks))))
276         for core in cores:
277             for task in tasks:
278                 stats = self._recv().split(',')
279                 if stats[0].startswith('error'):  
280                     if stats[0].startswith('error: invalid syntax'):
281                         RapidLog.critical("dp core stats error: unexpected \
282                                 invalid syntax (potential incompatibility \
283                                 between scripts and PROX)")
284                         raise Exception("dp core stats error")
285                     continue
286                 rx += int(stats[0])
287                 tx += int(stats[1])
288                 rx_non_dp += int(stats[2])
289                 tx_non_dp += int(stats[3])
290                 drop += int(stats[4])
291                 tx_fail += int(stats[5])
292                 tsc = int(stats[6])
293                 hz = int(stats[7])
294         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
295
296     def multi_port_stats(self, ports=[0]):
297         rx = tx = port_id = tsc = no_mbufs = errors = 0
298         self._send('multi port stats %s' % (','.join(map(str, ports))))
299         result = self._recv().split(';')
300         if result[0].startswith('error'):  
301             RapidLog.critical("multi port stats error: unexpected invalid \
302                     syntax (potential incompatibility between scripts and \
303                     PROX)")
304             raise Exception("multi port stats error")
305         for statistics in result:
306             stats = statistics.split(',')
307             port_id = int(stats[0])
308             rx += int(stats[1])
309             tx += int(stats[2])
310             no_mbufs += int(stats[3])
311             errors += int(stats[4])
312             tsc = int(stats[5])
313         return rx, tx, no_mbufs, errors, tsc
314
315     def set_random(self, cores, task, offset, mask, length):
316         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), 
317             task, offset, mask, length))
318
319     def set_size(self, cores, task, pkt_size):
320         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, 
321             pkt_size))
322
323     def set_imix(self, cores, task, imix):
324         self._send('imix %s %s %s' % (','.join(map(str, cores)), task, 
325             ','.join(map(str,imix))))
326
327     def set_value(self, cores, task, offset, value, length):
328         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), 
329             task, offset, value, length))
330
331     def _send(self, cmd):
332         """Append LF and send command to the PROX instance."""
333         if self._sock is None:
334             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
335         self._sock.sendall(cmd.encode() + b'\n')
336
337     def _recv(self):
338         """Receive response from PROX instance, return it with LF removed."""
339         if self._sock is None:
340             raise RuntimeError("PROX socket closed, cannot receive anymore")
341         pos = self._rcvd.find(b'\n')
342         while pos == -1:
343             self._rcvd += self._sock.recv(256)
344             pos = self._rcvd.find(b'\n')
345         rsp = self._rcvd[:pos]
346         self._rcvd = self._rcvd[pos+1:]
347         return rsp.decode()