6e25e7f4fb941fa10a52da8097a4d7eec6821c65
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog 
29
30 class prox_ctrl(object):
31     def __init__(self, ip, key=None, user=None):
32         self._ip   = ip
33         self._key  = key
34         self._user = user
35         self._children = []
36         self._proxsock = []
37
38     def __del__(self):
39         self.close()
40
41     def ip(self):
42         return self._ip
43
44     def test_connect(self):
45         """Simply try to run 'true' over ssh on remote system.
46         On failure, raise RuntimeWarning exception when possibly worth
47         retrying, and raise RuntimeError exception otherwise.
48         """
49         return self.run_cmd('true', True)
50
51     def connect(self):
52         attempts = 1
53         RapidLog.debug("Trying to connect to VM which was just launched on %s, attempt: %d" % (self._ip, attempts))
54         while True:
55             try:
56                 self.test_connect()
57                 break
58             except RuntimeWarning as ex:
59                 attempts += 1
60                 if attempts > 20:
61                     RapidLog.exception("Failed to connect to VM after %d attempts:\n%s" % (attempts, ex))
62                     raise Exception("Failed to connect to VM after %d attempts:\n%s" % (attempts, ex))
63                 time.sleep(2)
64                 RapidLog.debug("Trying to connect to VM which was just launched on %s, attempt: %d" % (self._ip, attempts))
65         RapidLog.debug("Connected to VM on %s" % self._ip)
66
67     def connect_socket(self):
68         attempts = 1
69         RapidLog.debug("Trying to connect to PROX (just launched) on %s, attempt: %d" % (self._ip, attempts))
70         sock = None
71         while True:
72             sock = self.prox_sock()
73             if sock is not None:
74                 break
75             attempts += 1
76             if attempts > 20:
77                 RapidLog.exception("Failed to connect to PROX on %s after %d attempts" % (self._ip, attempts))
78                 raise Exception("Failed to connect to PROX on %s after %d attempts" % (self._ip, attempts))
79             time.sleep(2)
80             RapidLog.debug("Trying to connect to PROX (just launched) on %s, attempt: %d" % (self._ip, attempts))
81         RapidLog.info("Connected to PROX on %s" % self._ip)
82         return sock
83
84     def close(self):
85         """Must be called before program termination."""
86         for sock in self._proxsock:
87             sock.quit()
88         children = len(self._children)
89         if children == 0:
90             return
91         if children > 1:
92             print('Waiting for %d child processes to complete ...' % children)
93         for child in self._children:
94             ret = os.waitpid(child[0], os.WNOHANG)
95             if ret[0] == 0:
96                 print("Waiting for child process '%s' to complete ..." % child[1])
97                 ret = os.waitpid(child[0], 0)
98             rc = ret[1]
99             if os.WIFEXITED(rc):
100                 if os.WEXITSTATUS(rc) == 0:
101                     print("Child process '%s' completed successfully" % child[1])
102                 else:
103                     print("Child process '%s' returned exit status %d" % (
104                             child[1], os.WEXITSTATUS(rc)))
105             elif os.WIFSIGNALED(rc):
106                 print("Child process '%s' exited on signal %d" % (
107                         child[1], os.WTERMSIG(rc)))
108             else:
109                 print("Wait status for child process '%s' is 0x%04x" % (
110                         child[1], rc))
111
112     def run_cmd(self, command, _connect=False):
113         """Execute command over ssh on remote system.
114         Wait for remote command completion.
115         Return command output (combined stdout and stderr).
116         _connect argument is reserved for connect() method.
117         """
118         cmd = self._build_ssh(command)
119         try:
120             return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
121         except subprocess.CalledProcessError as ex:
122             if _connect and ex.returncode == 255:
123                 raise RuntimeWarning(ex.output.strip())
124             raise RuntimeError('ssh returned exit status %d:\n%s'
125                     % (ex.returncode, ex.output.strip()))
126
127     def fork_cmd(self, command, name=None):
128         """Execute command over ssh on remote system, in a child process.
129         Do not wait for remote command completion.
130         Return child process id.
131         """
132         if name is None:
133             name = command
134         cmd = self._build_ssh(command)
135         pid = os.fork()
136         if (pid != 0):
137             # In the parent process
138             self._children.append((pid, name))
139             return pid
140         # In the child process: use os._exit to terminate
141         try:
142             # Actually ignore output on success, but capture stderr on failure
143             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
144         except subprocess.CalledProcessError as ex:
145             raise RuntimeError("Child process '%s' failed:\n"
146                     'ssh returned exit status %d:\n%s'
147                     % (name, ex.returncode, ex.output.strip()))
148         os._exit(0)
149
150     def prox_sock(self, port=8474):
151         """Connect to the PROX instance on remote system.
152         Return a prox_sock object on success, None on failure.
153         """
154         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
155         try:
156             sock.connect((self._ip, port))
157             prox = prox_sock(sock)
158             self._proxsock.append(prox)
159             return prox
160         except:
161             return None
162
163     def scp_put(self, src, dst):
164         """Copy src file from local system to dst on remote system."""
165         cmd = [ 'scp',
166                 '-B',
167                 '-oStrictHostKeyChecking=no',
168                 '-oUserKnownHostsFile=/dev/null',
169                 '-oLogLevel=ERROR' ]
170         if self._key is not None:
171             cmd.extend(['-i', self._key])
172         cmd.append(src)
173         remote = ''
174         if self._user is not None:
175             remote += self._user + '@'
176         remote += self._ip + ':' + dst
177         cmd.append(remote)
178         try:
179             # Actually ignore output on success, but capture stderr on failure
180             subprocess.check_output(cmd, stderr=subprocess.STDOUT)
181         except subprocess.CalledProcessError as ex:
182             raise RuntimeError('scp returned exit status %d:\n%s'
183                     % (ex.returncode, ex.output.strip()))
184
185     def _build_ssh(self, command):
186         cmd = [ 'ssh',
187                 '-oBatchMode=yes',
188                 '-oStrictHostKeyChecking=no',
189                 '-oUserKnownHostsFile=/dev/null',
190                 '-oLogLevel=ERROR' ]
191         if self._key is not None:
192             cmd.extend(['-i', self._key])
193         remote = ''
194         if self._user is not None:
195             remote += self._user + '@'
196         remote += self._ip
197         cmd.append(remote)
198         cmd.append(command)
199         return cmd
200
201 class prox_sock(object):
202     def __init__(self, sock):
203         self._sock = sock
204         self._rcvd = b''
205
206     def __del__(self):
207         self.quit()
208
209     def quit(self):
210         if self._sock is not None:
211             self._send('quit')
212             self._sock.close()
213             self._sock = None
214
215     def start(self, cores):
216         self._send('start %s' % ','.join(map(str, cores)))
217
218     def stop(self, cores):
219         self._send('stop %s' % ','.join(map(str, cores)))
220
221     def speed(self, speed, cores, tasks=[0]):
222         for core in cores:
223             for task in tasks:
224                 self._send('speed %s %s %s' % (core, task, speed))
225
226     def reset_stats(self):
227         self._send('reset stats')
228
229     def lat_stats(self, cores, tasks=[0]):
230         min_lat = 999999999
231         max_lat = avg_lat = 0
232         number_tasks_returning_stats = 0
233         buckets = [0] * 128
234         self._send('lat all stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
235         for core in cores:
236             for task in tasks:
237                 stats = self._recv().split(',')
238             if 'is not measuring' in stats[0]:
239                 continue
240             if stats[0].startswith('error'):
241                 RapidLog.critical("lat stats error: unexpected reply from PROX (potential incompatibility between scripts and PROX)")
242                 raise Exception("lat stats error")
243             number_tasks_returning_stats += 1
244             min_lat = min(int(stats[0]),min_lat)
245             max_lat = max(int(stats[1]),max_lat)
246             avg_lat += int(stats[2])
247             #min_since begin = int(stats[3])
248             #max_since_begin = int(stats[4])
249             tsc = int(stats[5]) # Taking the last tsc as the timestamp since PROX will return the same tsc for each core/task combination 
250             hz = int(stats[6])
251             #coreid = int(stats[7])
252             #taskid = int(stats[8])
253             stats = self._recv().split(':')
254             if stats[0].startswith('error'):
255                 RapidLog.critical("lat stats error: unexpected lat bucket reply (potential incompatibility between scripts and PROX)")
256                 raise Exception("lat bucket reply error")
257             buckets[0] = int(stats[1])
258             for i in range(1, 128):
259                 stats = self._recv().split(':')
260                 buckets[i] = int(stats[1])
261         avg_lat = old_div(avg_lat,number_tasks_returning_stats)
262         self._send('stats latency(0).used')
263         used = float(self._recv())
264         self._send('stats latency(0).total')
265         total = float(self._recv())
266         return min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz, buckets
267
268     def irq_stats(self, core, bucket, task=0):
269         self._send('stats task.core(%s).task(%s).irq(%s)' % (core, task, bucket))
270         stats = self._recv().split(',')
271         return int(stats[0])
272
273     def show_irq_buckets(self, core, task=0):
274         rx = tx = drop = tsc = hz = 0
275         self._send('show irq buckets %s %s' % (core,task))
276         buckets = self._recv().split(';')
277         buckets = buckets[:-1]
278         return buckets
279
280     def core_stats(self, cores, tasks=[0]):
281         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
282         self._send('dp core stats %s %s' % (','.join(map(str, cores)), ','.join(map(str, tasks))))
283         for core in cores:
284             for task in tasks:
285                 stats = self._recv().split(',')
286                 if stats[0].startswith('error'):  
287                     if stats[0].startswith('error: invalid syntax'):
288                         RapidLog.critical("dp core stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
289                         raise Exception("dp core stats error")
290                     continue
291                 rx += int(stats[0])
292                 tx += int(stats[1])
293                 rx_non_dp += int(stats[2])
294                 tx_non_dp += int(stats[3])
295                 drop += int(stats[4])
296                 tx_fail += int(stats[5])
297                 tsc = int(stats[6])
298                 hz = int(stats[7])
299         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
300
301     def multi_port_stats(self, ports=[0]):
302         rx = tx = port_id = tsc = no_mbufs = errors = 0
303         self._send('multi port stats %s' % (','.join(map(str, ports))))
304         result = self._recv().split(';')
305         if result[0].startswith('error'):  
306             RapidLog.critical("multi port stats error: unexpected invalid syntax (potential incompatibility between scripts and PROX)")
307             raise Exception("multi port stats error")
308         for statistics in result:
309             stats = statistics.split(',')
310             port_id = int(stats[0])
311             rx += int(stats[1])
312             tx += int(stats[2])
313             no_mbufs += int(stats[3])
314             errors += int(stats[4])
315             tsc = int(stats[5])
316         return rx, tx, no_mbufs, errors, tsc
317
318     def set_random(self, cores, task, offset, mask, length):
319         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, mask, length))
320
321     def set_size(self, cores, task, pkt_size):
322         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task, pkt_size))
323
324     def set_value(self, cores, task, offset, value, length):
325         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)), task, offset, value, length))
326
327     def _send(self, cmd):
328         """Append LF and send command to the PROX instance."""
329         if self._sock is None:
330             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
331         self._sock.sendall(cmd.encode() + b'\n')
332
333     def _recv(self):
334         """Receive response from PROX instance, and return it with LF removed."""
335         if self._sock is None:
336             raise RuntimeError("PROX socket closed, cannot receive anymore")
337         pos = self._rcvd.find(b'\n')
338         while pos == -1:
339             self._rcvd += self._sock.recv(256)
340             pos = self._rcvd.find(b'\n')
341         rsp = self._rcvd[:pos]
342         self._rcvd = self._rcvd[pos+1:]
343         return rsp.decode()