2 ## Copyright (c) 2010-2020 Intel Corporation
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
8 ## http://www.apache.org/licenses/LICENSE-2.0
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
17 from __future__ import print_function
18 from __future__ import division
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
28 from rapid_log import RapidLog
30 class prox_ctrl(object):
31 def __init__(self, ip, key=None, user=None):
44 def test_connect(self):
45 """Simply try to run 'true' over ssh on remote system.
46 On failure, raise RuntimeWarning exception when possibly worth
47 retrying, and raise RuntimeError exception otherwise.
49 return self.run_cmd('test -e /opt/rapid/system_ready_for_rapid', True)
53 RapidLog.debug("Trying to connect to instance which was just launched \
54 on %s, attempt: %d" % (self._ip, attempts))
59 except RuntimeWarning as ex:
62 RapidLog.exception("Failed to connect to instance after %d\
63 attempts:\n%s" % (attempts, ex))
64 raise Exception("Failed to connect to instance after %d \
65 attempts:\n%s" % (attempts, ex))
67 RapidLog.debug("Trying to connect to instance which was just \
68 launched on %s, attempt: %d" % (self._ip, attempts))
69 RapidLog.debug("Connected to instance on %s" % self._ip)
71 def connect_socket(self):
73 RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
74 attempt: %d" % (self._ip, attempts))
77 sock = self.prox_sock()
82 RapidLog.exception("Failed to connect to PROX on %s after %d \
83 attempts" % (self._ip, attempts))
84 raise Exception("Failed to connect to PROX on %s after %d \
85 attempts" % (self._ip, attempts))
87 RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
88 attempt: %d" % (self._ip, attempts))
89 RapidLog.info("Connected to PROX on %s" % self._ip)
93 """Must be called before program termination."""
94 for sock in self._proxsock:
96 children = len(self._children)
100 print('Waiting for %d child processes to complete ...' % children)
101 for child in self._children:
102 ret = os.waitpid(child[0], os.WNOHANG)
104 print("Waiting for child process '%s' to complete ..."
106 ret = os.waitpid(child[0], 0)
109 if os.WEXITSTATUS(rc) == 0:
110 print("Child process '%s' completed successfully"
113 print("Child process '%s' returned exit status %d" % (
114 child[1], os.WEXITSTATUS(rc)))
115 elif os.WIFSIGNALED(rc):
116 print("Child process '%s' exited on signal %d" % (
117 child[1], os.WTERMSIG(rc)))
119 print("Wait status for child process '%s' is 0x%04x" % (
122 def run_cmd(self, command, _connect=False):
123 """Execute command over ssh on remote system.
124 Wait for remote command completion.
125 Return command output (combined stdout and stderr).
126 _connect argument is reserved for connect() method.
128 cmd = self._build_ssh(command)
130 return subprocess.check_output(cmd, stderr=subprocess.STDOUT)
131 except subprocess.CalledProcessError as ex:
132 #if _connect and ex.returncode == 255:
134 raise RuntimeWarning(ex.output.strip())
135 raise RuntimeError('ssh returned exit status %d:\n%s'
136 % (ex.returncode, ex.output.strip()))
138 def fork_cmd(self, command, name=None):
139 """Execute command over ssh on remote system, in a child process.
140 Do not wait for remote command completion.
141 Return child process id.
145 cmd = self._build_ssh(command)
148 # In the parent process
149 self._children.append((pid, name))
151 # In the child process: use os._exit to terminate
153 # Actually ignore output on success, but capture stderr on failure
154 subprocess.check_output(cmd, stderr=subprocess.STDOUT)
155 except subprocess.CalledProcessError as ex:
156 raise RuntimeError("Child process '%s' failed:\n"
157 'ssh returned exit status %d:\n%s'
158 % (name, ex.returncode, ex.output.strip()))
161 def prox_sock(self, port=8474):
162 """Connect to the PROX instance on remote system.
163 Return a prox_sock object on success, None on failure.
165 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
167 sock.connect((self._ip, port))
168 prox = prox_sock(sock)
169 self._proxsock.append(prox)
174 def scp_put(self, src, dst):
175 """Copy src file from local system to dst on remote system."""
178 '-oStrictHostKeyChecking=no',
179 '-oUserKnownHostsFile=/dev/null',
181 if self._key is not None:
182 cmd.extend(['-i', self._key])
185 if self._user is not None:
186 remote += self._user + '@'
187 remote += self._ip + ':' + dst
190 # Actually ignore output on success, but capture stderr on failure
191 subprocess.check_output(cmd, stderr=subprocess.STDOUT)
192 except subprocess.CalledProcessError as ex:
193 raise RuntimeError('scp returned exit status %d:\n%s'
194 % (ex.returncode, ex.output.strip()))
196 def scp_get(self, src, dst):
197 """Copy src file from remote system to dst on local system."""
200 '-oStrictHostKeyChecking=no',
201 '-oUserKnownHostsFile=/dev/null',
203 if self._key is not None:
204 cmd.extend(['-i', self._key])
206 if self._user is not None:
207 remote += self._user + '@'
208 remote += self._ip + ':/home/' + self._user + src
212 # Actually ignore output on success, but capture stderr on failure
213 subprocess.check_output(cmd, stderr=subprocess.STDOUT)
214 except subprocess.CalledProcessError as ex:
215 raise RuntimeError('scp returned exit status %d:\n%s'
216 % (ex.returncode, ex.output.strip()))
218 def _build_ssh(self, command):
221 '-oStrictHostKeyChecking=no',
222 '-oUserKnownHostsFile=/dev/null',
224 if self._key is not None:
225 cmd.extend(['-i', self._key])
227 if self._user is not None:
228 remote += self._user + '@'
234 class prox_sock(object):
235 def __init__(self, sock):
243 if self._sock is not None:
248 def start(self, cores):
249 self._send('start %s' % ','.join(map(str, cores)))
251 def stop(self, cores):
252 self._send('stop %s' % ','.join(map(str, cores)))
254 def speed(self, speed, cores, tasks=[0]):
257 self._send('speed %s %s %s' % (core, task, speed))
259 def reset_stats(self):
260 self._send('reset stats')
262 def lat_stats(self, cores, tasks=[0]):
264 max_lat = avg_lat = 0
265 number_tasks_returning_stats = 0
267 self._send('lat all stats %s %s' % (','.join(map(str, cores)),
268 ','.join(map(str, tasks))))
271 stats = self._recv().split(',')
272 if 'is not measuring' in stats[0]:
274 if stats[0].startswith('error'):
275 RapidLog.critical("lat stats error: unexpected reply from PROX\
276 (potential incompatibility between scripts and PROX)")
277 raise Exception("lat stats error")
278 number_tasks_returning_stats += 1
279 min_lat = min(int(stats[0]),min_lat)
280 max_lat = max(int(stats[1]),max_lat)
281 avg_lat += int(stats[2])
282 #min_since begin = int(stats[3])
283 #max_since_begin = int(stats[4])
284 tsc = int(stats[5]) # Taking the last tsc as the timestamp since
285 # PROX will return the same tsc for each
286 # core/task combination
288 #coreid = int(stats[7])
289 #taskid = int(stats[8])
290 stats = self._recv().split(':')
291 if stats[0].startswith('error'):
292 RapidLog.critical("lat stats error: unexpected lat bucket \
293 reply (potential incompatibility between scripts \
295 raise Exception("lat bucket reply error")
296 buckets[0] = int(stats[1])
297 for i in range(1, 128):
298 stats = self._recv().split(':')
299 buckets[i] = int(stats[1])
300 avg_lat = old_div(avg_lat,number_tasks_returning_stats)
301 self._send('stats latency(0).used')
302 used = float(self._recv())
303 self._send('stats latency(0).total')
304 total = float(self._recv())
305 return (min_lat, max_lat, avg_lat, (old_div(used,total)), tsc, hz,
308 def irq_stats(self, core, bucket, task=0):
309 self._send('stats task.core(%s).task(%s).irq(%s)' %
310 (core, task, bucket))
311 stats = self._recv().split(',')
314 def show_irq_buckets(self, core, task=0):
315 rx = tx = drop = tsc = hz = 0
316 self._send('show irq buckets %s %s' % (core,task))
317 buckets = self._recv().split(';')
318 buckets = buckets[:-1]
321 def core_stats(self, cores, tasks=[0]):
322 rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
323 self._send('dp core stats %s %s' % (','.join(map(str, cores)),
324 ','.join(map(str, tasks))))
327 stats = self._recv().split(',')
328 if stats[0].startswith('error'):
329 if stats[0].startswith('error: invalid syntax'):
330 RapidLog.critical("dp core stats error: unexpected \
331 invalid syntax (potential incompatibility \
332 between scripts and PROX)")
333 raise Exception("dp core stats error")
337 rx_non_dp += int(stats[2])
338 tx_non_dp += int(stats[3])
339 drop += int(stats[4])
340 tx_fail += int(stats[5])
343 return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
345 def multi_port_stats(self, ports=[0]):
346 rx = tx = port_id = tsc = no_mbufs = errors = 0
347 self._send('multi port stats %s' % (','.join(map(str, ports))))
348 result = self._recv().split(';')
349 if result[0].startswith('error'):
350 RapidLog.critical("multi port stats error: unexpected invalid \
351 syntax (potential incompatibility between scripts and \
353 raise Exception("multi port stats error")
354 for statistics in result:
355 stats = statistics.split(',')
356 port_id = int(stats[0])
359 no_mbufs += int(stats[3])
360 errors += int(stats[4])
362 return rx, tx, no_mbufs, errors, tsc
364 def set_random(self, cores, task, offset, mask, length):
365 self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
366 task, offset, mask, length))
368 def set_size(self, cores, task, pkt_size):
369 self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
372 def set_imix(self, cores, task, imix):
373 self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
374 ','.join(map(str,imix))))
376 def set_value(self, cores, task, offset, value, length):
377 self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
378 task, offset, value, length))
380 def _send(self, cmd):
381 """Append LF and send command to the PROX instance."""
382 if self._sock is None:
383 raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
384 self._sock.sendall(cmd.encode() + b'\n')
387 """Receive response from PROX instance, return it with LF removed."""
388 if self._sock is None:
389 raise RuntimeError("PROX socket closed, cannot receive anymore")
390 pos = self._rcvd.find(b'\n')
392 self._rcvd += self._sock.recv(256)
393 pos = self._rcvd.find(b'\n')
394 rsp = self._rcvd[:pos]
395 self._rcvd = self._rcvd[pos+1:]