c1aade6b0119adf0163489dda9ce1f98992fa182
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog
29 from rapid_sshclient import SSHClient
30
31 class prox_ctrl(object):
32     def __init__(self, ip, key=None, user=None, password = None):
33         self._ip   = ip
34         self._key  = key
35         self._user = user
36         self._password = password
37         self._proxsock = []
38         self._sshclient = SSHClient(ip = ip, user = user, password = password,
39                 rsa_private_key = key, timeout = None)
40
41     def ip(self):
42         return self._ip
43
44     def test_connection(self):
45         attempts = 1
46         RapidLog.debug("Trying to connect to machine \
47                 on %s, attempt: %d" % (self._ip, attempts))
48         while True:
49             try:
50                 self.run_cmd('test -e /opt/rapid/system_ready_for_rapid')
51                 break
52             except RuntimeWarning as ex:
53                 RapidLog.debug("RuntimeWarning %d:\n%s"
54                     % (ex.returncode, ex.output.strip()))
55                 attempts += 1
56                 if attempts > 20:
57                     RapidLog.exception("Failed to connect to instance after %d\
58                             attempts:\n%s" % (attempts, ex))
59                 time.sleep(2)
60                 RapidLog.debug("Trying to connect to machine \
61                        on %s, attempt: %d" % (self._ip, attempts))
62         RapidLog.debug("Connected to machine on %s" % self._ip)
63
64     def connect_socket(self):
65         attempts = 1
66         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
67                 attempt: %d" % (self._ip, attempts))
68         sock = None
69         while True:
70             sock = self.prox_sock()
71             if sock is not None:
72                 break
73             attempts += 1
74             if attempts > 20:
75                 RapidLog.exception("Failed to connect to PROX on %s after %d \
76                         attempts" % (self._ip, attempts))
77             time.sleep(2)
78             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
79                     attempt: %d" % (self._ip, attempts))
80         RapidLog.info("Connected to PROX on %s" % self._ip)
81         return sock
82
83     def close(self):
84         for sock in self._proxsock:
85             sock.quit()
86
87     def run_cmd(self, command):
88         self._sshclient.run_cmd(command)
89         return self._sshclient.get_output()
90
91     def prox_sock(self, port=8474):
92         """Connect to the PROX instance on remote system.
93         Return a prox_sock object on success, None on failure.
94         """
95         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
96         try:
97             sock.connect((self._ip, port))
98             prox = prox_sock(sock)
99             self._proxsock.append(prox)
100             return prox
101         except:
102             return None
103
104     def scp_put(self, src, dst):
105         self._sshclient.scp_put(src, dst)
106
107     def scp_get(self, src, dst):
108         self._sshclient.scp_get('/home/' + self._user + src, dst)
109
110 class prox_sock(object):
111     def __init__(self, sock):
112         self._sock = sock
113         self._rcvd = b''
114
115     def __del__(self):
116         if self._sock is not None:
117             self._sock.close()
118             self._sock = None
119
120     def start(self, cores):
121         self._send('start %s' % ','.join(map(str, cores)))
122
123     def stop(self, cores):
124         self._send('stop %s' % ','.join(map(str, cores)))
125
126     def speed(self, speed, cores, tasks=[0]):
127         for core in cores:
128             for task in tasks:
129                 self._send('speed %s %s %s' % (core, task, speed))
130
131     def reset_stats(self):
132         self._send('reset stats')
133
134     def lat_stats(self, cores, tasks=[0]):
135         result = {}
136         result['lat_min'] = 999999999
137         result['lat_max'] = result['lat_avg'] = 0
138         result['buckets'] = [0] * 128
139         result['mis_ordered'] = 0
140         result['extent'] = 0
141         result['duplicate'] = 0
142         number_tasks_returning_stats = 0
143         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
144             ','.join(map(str, tasks))))
145         for core in cores:
146             for task in tasks:
147                 stats = self._recv().split(',')
148             if 'is not measuring' in stats[0]:
149                 continue
150             if stats[0].startswith('error'):
151                 RapidLog.critical("lat stats error: unexpected reply from PROX\
152                         (potential incompatibility between scripts and PROX)")
153                 raise Exception("lat stats error")
154             number_tasks_returning_stats += 1
155             result['lat_min'] = min(int(stats[0]),result['lat_min'])
156             result['lat_max'] = max(int(stats[1]),result['lat_max'])
157             result['lat_avg'] += int(stats[2])
158             #min_since begin = int(stats[3])
159             #max_since_begin = int(stats[4])
160             result['lat_tsc'] = int(stats[5])
161             # Taking the last tsc as the timestamp since
162             # PROX will return the same tsc for each
163             # core/task combination
164             result['lat_hz'] = int(stats[6])
165             #coreid = int(stats[7])
166             #taskid = int(stats[8])
167             result['mis_ordered'] += int(stats[9])
168             result['extent'] += int(stats[10])
169             result['duplicate'] += int(stats[11])
170             stats = self._recv().split(':')
171             if stats[0].startswith('error'):
172                 RapidLog.critical("lat stats error: unexpected lat bucket \
173                         reply (potential incompatibility between scripts \
174                         and PROX)")
175                 raise Exception("lat bucket reply error")
176             result['buckets'][0] = int(stats[1])
177             for i in range(1, 128):
178                 stats = self._recv().split(':')
179                 result['buckets'][i] += int(stats[1])
180         result['lat_avg'] = old_div(result['lat_avg'],
181                 number_tasks_returning_stats)
182         self._send('stats latency(0).used')
183         used = float(self._recv())
184         self._send('stats latency(0).total')
185         total = float(self._recv())
186         result['lat_used'] = old_div(used,total)
187         return (result)
188
189     def irq_stats(self, core, bucket, task=0):
190         self._send('stats task.core(%s).task(%s).irq(%s)' %
191                 (core, task, bucket))
192         stats = self._recv().split(',')
193         return int(stats[0])
194
195     def show_irq_buckets(self, core, task=0):
196         rx = tx = drop = tsc = hz = 0
197         self._send('show irq buckets %s %s' % (core,task))
198         buckets = self._recv().split(';')
199         buckets = buckets[:-1]
200         return buckets
201
202     def core_stats(self, cores, tasks=[0]):
203         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
204         self._send('dp core stats %s %s' % (','.join(map(str, cores)),
205             ','.join(map(str, tasks))))
206         for core in cores:
207             for task in tasks:
208                 stats = self._recv().split(',')
209                 if stats[0].startswith('error'):
210                     if stats[0].startswith('error: invalid syntax'):
211                         RapidLog.critical("dp core stats error: unexpected \
212                                 invalid syntax (potential incompatibility \
213                                 between scripts and PROX)")
214                         raise Exception("dp core stats error")
215                     continue
216                 rx += int(stats[0])
217                 tx += int(stats[1])
218                 rx_non_dp += int(stats[2])
219                 tx_non_dp += int(stats[3])
220                 drop += int(stats[4])
221                 tx_fail += int(stats[5])
222                 tsc = int(stats[6])
223                 hz = int(stats[7])
224         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
225
226     def multi_port_stats(self, ports=[0]):
227         rx = tx = port_id = tsc = no_mbufs = errors = 0
228         self._send('multi port stats %s' % (','.join(map(str, ports))))
229         result = self._recv().split(';')
230         if result[0].startswith('error'):
231             RapidLog.critical("multi port stats error: unexpected invalid \
232                     syntax (potential incompatibility between scripts and \
233                     PROX)")
234             raise Exception("multi port stats error")
235         for statistics in result:
236             stats = statistics.split(',')
237             port_id = int(stats[0])
238             rx += int(stats[1])
239             tx += int(stats[2])
240             no_mbufs += int(stats[3])
241             errors += int(stats[4])
242             tsc = int(stats[5])
243         return rx, tx, no_mbufs, errors, tsc
244
245     def set_random(self, cores, task, offset, mask, length):
246         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
247             task, offset, mask, length))
248
249     def set_size(self, cores, task, pkt_size):
250         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
251             pkt_size))
252
253     def set_imix(self, cores, task, imix):
254         self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
255             ','.join(map(str,imix))))
256
257     def set_value(self, cores, task, offset, value, length):
258         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
259             task, offset, value, length))
260
261     def quit_prox(self):
262         self._send('quit')
263
264     def _send(self, cmd):
265         """Append LF and send command to the PROX instance."""
266         if self._sock is None:
267             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
268         try:
269             self._sock.sendall(cmd.encode() + b'\n')
270         except ConnectionResetError as e:
271             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
272             raise
273
274     def _recv(self):
275         """Receive response from PROX instance, return it with LF removed."""
276         if self._sock is None:
277             raise RuntimeError("PROX socket closed, cannot receive anymore")
278         try:
279             pos = self._rcvd.find(b'\n')
280             while pos == -1:
281                 self._rcvd += self._sock.recv(256)
282                 pos = self._rcvd.find(b'\n')
283             rsp = self._rcvd[:pos]
284             self._rcvd = self._rcvd[pos+1:]
285         except ConnectionResetError as e:
286             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
287             raise
288         return rsp.decode()