ramp generator traffic & prox.log copy
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog 
29
30 class prox_ctrl(object):
31     def __init__(self, ip, key=None, user=None):
32         self._ip   = ip
33         self._key  = key
34         self._user = user
35         self._children = []
36         self._proxsock = []
37
38     def __del__(self):
39         self.close()
40
41     def ip(self):
42         return self._ip
43
44     def test_connect(self):
45         """Simply try to run 'true' over ssh on remote system.
46         On failure, raise RuntimeWarning exception when possibly worth
47         retrying, and raise RuntimeError exception otherwise.
48         """
49         return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
50
51     def connect(self):
52         attempts = 1
53         RapidLog.debug("Trying to connect to instance which was just launched \
54                 on %s, attempt: %d" % (self._ip, attempts))
55         while True:
56             try:
57                 self.test_connect()
58                 break
59             except RuntimeWarning as ex:
60                 attempts += 1
61                 if attempts > 20:
62                     RapidLog.exception("Failed to connect to instance after %d\
63                             attempts:\n%s" % (attempts, ex))
64                     raise Exception("Failed to connect to instance after %d \
65                             attempts:\n%s" % (attempts, ex))
66                 time.sleep(2)
67                 RapidLog.debug("Trying to connect to instance which was just \
68                         launched on %s, attempt: %d" % (self._ip, attempts))
69         RapidLog.debug("Connected to instance on %s" % self._ip)
70
71     def connect_socket(self):
72         attempts = 1
73         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
74                 attempt: %d" % (self._ip, attempts))
75         sock = None
76         while True:
77             sock = self.prox_sock()
78             if sock is not None:
79                 break
80             attempts += 1
81             if attempts > 20:
82                 RapidLog.exception("Failed to connect to PROX on %s after %d \
83                         attempts" % (self._ip, attempts))
84                 raise Exception("Failed to connect to PROX on %s after %d \
85                         attempts" % (self._ip, attempts))
86             time.sleep(2)
87             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
88                     attempt: %d" % (self._ip, attempts))
89         RapidLog.info("Connected to PROX on %s" % self._ip)
90         return sock
91
92     def close(self):
93         """Must be called before program termination."""
94         for sock in self._proxsock:
95             sock.quit()
96         children = len(self._children)
97         if children == 0:
98             return
99         if children > 1:
100             print('Waiting for %d child processes to complete ...' % children)
101         for child in self._children:
102             ret = os.waitpid(child[0], os.WNOHANG)
103             if ret[0] == 0:
104                 print("Waiting for child process '%s' to complete ..." 
105                         % child[1])
106                 ret = os.waitpid(child[0], 0)
107             rc = ret[1]
108             if os.WIFEXITED(rc):
109                 if os.WEXITSTATUS(rc) == 0:
110                     print("Child process '%s' completed successfully" 
111                             % child[1])
112                 else:
113                     print("Child process '%s' returned exit status %d" % (
114                             child[1], os.WEXITSTATUS(rc)))
115             elif os.WIFSIGNALED(rc):
116                 print("Child process '%s' exited on signal %d" % (
117                         child[1], os.WTERMSIG(rc)))
118             else:
119                 print("Wait status for child process '%s' is 0x%04x" % (
120                         child[1], rc))
121
122     def run_cmd(self, command, _connect=False):
123         """Execute command over ssh on remote system.
124         Wait for remote command completion.
125         Return command output (combined stdout and stderr).
126         _connect argument is reserved for connect() method.
127         """
128         cmd = self._build_ssh(command)
129         try:
130             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
131         except subprocess.CalledProcessError as ex:
132             #if _connect and ex.returncode == 255:
133             if _connect:
134                 raise RuntimeWarning(ex.output.strip())
135             raise RuntimeError('ssh returned exit status %d:\n%s'
136                     % (ex.returncode, ex.output.strip()))
137
138     def fork_cmd(self, command, name=None):
139         """Execute command over ssh on remote system, in a child process.
140         Do not wait for remote command completion.
141         Return child process id.
142         """
143         if name is None:
144             name = command
145         cmd = self._build_ssh(command)
146         pid = os.fork()
147         if (pid != 0):
148             # In the parent process
149             self._children.append((pid, name))
150             return pid
151         # In the child process: use os._exit to terminate
152         try:
153             # Actually ignore output on success, but capture stderr on failure
154             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
155         except subprocess.CalledProcessError as ex:
156             raise RuntimeError("Child process '%s' failed:\n"
157                     'ssh returned exit status %d:\n%s'
158                     % (name, ex.returncode, ex.output.strip()))
159         os._exit(0)
160
161     def prox_sock(self, port=8474):
162         """Connect to the PROX instance on remote system.
163         Return a prox_sock object on success, None on failure.
164         """
165         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
166         try:
167             sock.connect((self._ip, port))
168             prox = prox_sock(sock)
169             self._proxsock.append(prox)
170             return prox
171         except:
172             return None
173
174     def scp_put(self, src, dst):
175         """Copy src file from local system to dst on remote system."""
176         cmd = [ 'scp',
177                 '-B',
178                 '-oStrictHostKeyChecking=no',
179                 '-oUserKnownHostsFile=/dev/null',
180                 '-oLogLevel=ERROR' ]
181         if self._key is not None:
182             cmd.extend(['-i', self._key])
183         cmd.append(src)
184         remote = ''
185         if self._user is not None:
186             remote += self._user + '@'
187         remote += self._ip + ':' + dst
188         cmd.append(remote)
189         try:
190             # Actually ignore output on success, but capture stderr on failure
191             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
192         except subprocess.CalledProcessError as ex:
193             raise RuntimeError('scp returned exit status %d:\n%s'
194                     % (ex.returncode, ex.output.strip()))
195
196     def scp_get(self, src, dst):
197         """Copy src file from remote system to dst on local system."""
198         cmd = [ 'scp',
199                 '-B',
200                 '-oStrictHostKeyChecking=no',
201                 '-oUserKnownHostsFile=/dev/null',
202                 '-oLogLevel=ERROR' ]
203         if self._key is not None:
204             cmd.extend(['-i', self._key])
205         remote = ''
206         if self._user is not None:
207             remote += self._user + '@'
208         remote += self._ip + ':/home/' + self._user + src
209         cmd.append(remote)
210         cmd.append(dst)
211         try:
212             # Actually ignore output on success, but capture stderr on failure
213             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
214         except subprocess.CalledProcessError as ex:
215             raise RuntimeError('scp returned exit status %d:\n%s'
216                     % (ex.returncode, ex.output.strip()))
217
218     def _build_ssh(self, command):
219         cmd = [ 'ssh',
220                 '-oBatchMode=yes',
221                 '-oStrictHostKeyChecking=no',
222                 '-oUserKnownHostsFile=/dev/null',
223                 '-oLogLevel=ERROR' ]
224         if self._key is not None:
225             cmd.extend(['-i', self._key])
226         remote = ''
227         if self._user is not None:
228             remote += self._user + '@'
229         remote += self._ip
230         cmd.append(remote)
231         cmd.append(command)
232         return cmd
233
234 class prox_sock(object):
235     def __init__(self, sock):
236         self._sock = sock
237         self._rcvd = b''
238
239     def __del__(self):
240         self.quit()
241
242     def quit(self):
243         if self._sock is not None:
244             self._send('quit')
245             self._sock.close()
246             self._sock = None
247
248     def start(self, cores):
249         self._send('start %s' % ','.join(map(str, cores)))
250
251     def stop(self, cores):
252         self._send('stop %s' % ','.join(map(str, cores)))
253
254     def speed(self, speed, cores, tasks=[0]):
255         for core in cores:
256             for task in tasks:
257                 self._send('speed %s %s %s' % (core, task, speed))
258
259     def reset_stats(self):
260         self._send('reset stats')
261
262     def lat_stats(self, cores, tasks=[0]):
263         min_lat = 999999999
264         max_lat = avg_lat = 0
265         number_tasks_returning_stats = 0
266         buckets = [0] * 128
267         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
268             ','.join(map(str, tasks))))
269         for core in cores:
270             for task in tasks:
271                 stats = self._recv().split(',')
272             if 'is not measuring' in stats[0]:
273                 continue
274             if stats[0].startswith('error'):
275                 RapidLog.critical("lat stats error: unexpected reply from PROX\
276                         (potential incompatibility between scripts and PROX)")
277                 raise Exception("lat stats error")
278             number_tasks_returning_stats += 1
279             min_lat = min(int(stats[0]),min_lat)
280             max_lat = max(int(stats[1]),max_lat)
281             avg_lat += int(stats[2])
282             #min_since begin = int(stats[3])
283             #max_since_begin = int(stats[4])
284             tsc = int(stats[5]) # Taking the last tsc as the timestamp since
285                                 # PROX will return the same tsc for each 
286                                 # core/task combination 
287             hz = int(stats[6])
288             #coreid = int(stats[7])
289             #taskid = int(stats[8])
290             stats = self._recv().split(':')
291             if stats[0].startswith('error'):
292                 RapidLog.critical("lat stats error: unexpected lat bucket \
293                         reply (potential incompatibility between scripts \
294                         and PROX)")
295                 raise Exception("lat bucket reply error")
296             buckets[0] = int(stats[1])
297             for i in range(1, 128):
298                 stats = self._recv().split(':')
299                 buckets[i] = int(stats[1])
300         avg_lat = old_div(avg_lat,number_tasks_returning_stats)
301         self._send('stats latency(0).used')
302         used = float(self._recv())
303         self._send('stats latency(0).total')
304         total = float(self._recv())
305         return (min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz,
306                 buckets)
307
308     def irq_stats(self, core, bucket, task=0):
309         self._send('stats task.core(%s).task(%s).irq(%s)' % 
310                 (core, task, bucket))
311         stats = self._recv().split(',')
312         return int(stats[0])
313
314     def show_irq_buckets(self, core, task=0):
315         rx = tx = drop = tsc = hz = 0
316         self._send('show irq buckets %s %s' % (core,task))
317         buckets = self._recv().split(';')
318         buckets = buckets[:-1]
319         return buckets
320
321     def core_stats(self, cores, tasks=[0]):
322         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
323         self._send('dp core stats %s %s' % (','.join(map(str, cores)), 
324             ','.join(map(str, tasks))))
325         for core in cores:
326             for task in tasks:
327                 stats = self._recv().split(',')
328                 if stats[0].startswith('error'):  
329                     if stats[0].startswith('error: invalid syntax'):
330                         RapidLog.critical("dp core stats error: unexpected \
331                                 invalid syntax (potential incompatibility \
332                                 between scripts and PROX)")
333                         raise Exception("dp core stats error")
334                     continue
335                 rx += int(stats[0])
336                 tx += int(stats[1])
337                 rx_non_dp += int(stats[2])
338                 tx_non_dp += int(stats[3])
339                 drop += int(stats[4])
340                 tx_fail += int(stats[5])
341                 tsc = int(stats[6])
342                 hz = int(stats[7])
343         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
344
345     def multi_port_stats(self, ports=[0]):
346         rx = tx = port_id = tsc = no_mbufs = errors = 0
347         self._send('multi port stats %s' % (','.join(map(str, ports))))
348         result = self._recv().split(';')
349         if result[0].startswith('error'):  
350             RapidLog.critical("multi port stats error: unexpected invalid \
351                     syntax (potential incompatibility between scripts and \
352                     PROX)")
353             raise Exception("multi port stats error")
354         for statistics in result:
355             stats = statistics.split(',')
356             port_id = int(stats[0])
357             rx += int(stats[1])
358             tx += int(stats[2])
359             no_mbufs += int(stats[3])
360             errors += int(stats[4])
361             tsc = int(stats[5])
362         return rx, tx, no_mbufs, errors, tsc
363
364     def set_random(self, cores, task, offset, mask, length):
365         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), 
366             task, offset, mask, length))
367
368     def set_size(self, cores, task, pkt_size):
369         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, 
370             pkt_size))
371
372     def set_imix(self, cores, task, imix):
373         self._send('imix %s %s %s' % (','.join(map(str, cores)), task, 
374             ','.join(map(str,imix))))
375
376     def set_value(self, cores, task, offset, value, length):
377         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), 
378             task, offset, value, length))
379
380     def _send(self, cmd):
381         """Append LF and send command to the PROX instance."""
382         if self._sock is None:
383             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
384         self._sock.sendall(cmd.encode() + b'\n')
385
386     def _recv(self):
387         """Receive response from PROX instance, return it with LF removed."""
388         if self._sock is None:
389             raise RuntimeError("PROX socket closed, cannot receive anymore")
390         pos = self._rcvd.find(b'\n')
391         while pos == -1:
392             self._rcvd += self._sock.recv(256)
393             pos = self._rcvd.find(b'\n')
394         rsp = self._rcvd[:pos]
395         self._rcvd = self._rcvd[pos+1:]
396         return rsp.decode()