Fix issue with system_ready_for_rapid file
[samplevnf.git] / VNFs / DPPD-PROX / helper-scripts / rapid / prox_ctrl.py
1 ##
2 ## Copyright (c) 2010-2020 Intel Corporation
3 ##
4 ## Licensed under the Apache License, Version 2.0 (the "License");
5 ## you may not use this file except in compliance with the License.
6 ## You may obtain a copy of the License at
7 ##
8 ##     http://www.apache.org/licenses/LICENSE-2.0
9 ##
10 ## Unless required by applicable law or agreed to in writing, software
11 ## distributed under the License is distributed on an "AS IS" BASIS,
12 ## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 ## See the License for the specific language governing permissions and
14 ## limitations under the License.
15 ##
16
17 from __future__ import print_function
18 from __future__ import division
19
20 from builtins import map
21 from builtins import range
22 from past.utils import old_div
23 from builtins import object
24 import os
25 import time
26 import subprocess
27 import socket
28 from rapid_log import RapidLog
29 from rapid_sshclient import SSHClient
30
31 class prox_ctrl(object):
32     def __init__(self, ip, key=None, user=None, password = None):
33         self._ip   = ip
34         self._key  = key
35         self._user = user
36         self._password = password
37         self._proxsock = []
38         self._sshclient = SSHClient(ip = ip, user = user, password = password,
39                 rsa_private_key = key, timeout = None)
40
41     def ip(self):
42         return self._ip
43
44     def test_connection(self):
45         attempts = 1
46         RapidLog.debug("Trying to connect to machine \
47                 on %s, attempt: %d" % (self._ip, attempts))
48         while True:
49             try:
50                 if (self.run_cmd('test -e /opt/rapid/system_ready_for_rapid \
51                         && echo exists')):
52                     break
53                 time.sleep(2)
54             except RuntimeWarning as ex:
55                 RapidLog.debug("RuntimeWarning %d:\n%s"
56                     % (ex.returncode, ex.output.strip()))
57                 attempts += 1
58                 if attempts > 20:
59                     RapidLog.exception("Failed to connect to instance after %d\
60                             attempts:\n%s" % (attempts, ex))
61                 time.sleep(2)
62                 RapidLog.debug("Trying to connect to machine \
63                        on %s, attempt: %d" % (self._ip, attempts))
64         RapidLog.debug("Connected to machine on %s" % self._ip)
65
66     def connect_socket(self):
67         attempts = 1
68         RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
69                 attempt: %d" % (self._ip, attempts))
70         sock = None
71         while True:
72             sock = self.prox_sock()
73             if sock is not None:
74                 break
75             attempts += 1
76             if attempts > 20:
77                 RapidLog.exception("Failed to connect to PROX on %s after %d \
78                         attempts" % (self._ip, attempts))
79             time.sleep(2)
80             RapidLog.debug("Trying to connect to PROX (just launched) on %s, \
81                     attempt: %d" % (self._ip, attempts))
82         RapidLog.info("Connected to PROX on %s" % self._ip)
83         return sock
84
85     def close(self):
86         for sock in self._proxsock:
87             sock.quit()
88
89     def run_cmd(self, command):
90         self._sshclient.run_cmd(command)
91         return self._sshclient.get_output()
92
93     def prox_sock(self, port=8474):
94         """Connect to the PROX instance on remote system.
95         Return a prox_sock object on success, None on failure.
96         """
97         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
98         try:
99             sock.connect((self._ip, port))
100             prox = prox_sock(sock)
101             self._proxsock.append(prox)
102             return prox
103         except:
104             return None
105
106     def scp_put(self, src, dst):
107         self._sshclient.scp_put(src, dst)
108         RapidLog.info("Copying from {} to {}:{}".format(src, self._ip, dst))
109
110     def scp_get(self, src, dst):
111         self._sshclient.scp_get('/home/' + self._user + src, dst)
112         RapidLog.info("Copying from {}:/home/{}{} to {}".format(self._ip,
113             self._user, src, dst))
114
115 class prox_sock(object):
116     def __init__(self, sock):
117         self._sock = sock
118         self._rcvd = b''
119
120     def __del__(self):
121         if self._sock is not None:
122             self._sock.close()
123             self._sock = None
124
125     def start(self, cores):
126         self._send('start %s' % ','.join(map(str, cores)))
127
128     def stop(self, cores):
129         self._send('stop %s' % ','.join(map(str, cores)))
130
131     def speed(self, speed, cores, tasks=[0]):
132         for core in cores:
133             for task in tasks:
134                 self._send('speed %s %s %s' % (core, task, speed))
135
136     def reset_stats(self):
137         self._send('reset stats')
138
139     def lat_stats(self, cores, tasks=[0]):
140         result = {}
141         result['lat_min'] = 999999999
142         result['lat_max'] = result['lat_avg'] = 0
143         result['buckets'] = [0] * 128
144         result['mis_ordered'] = 0
145         result['extent'] = 0
146         result['duplicate'] = 0
147         number_tasks_returning_stats = 0
148         self._send('lat all stats %s %s' % (','.join(map(str, cores)),
149             ','.join(map(str, tasks))))
150         for core in cores:
151             for task in tasks:
152                 stats = self._recv().split(',')
153             if 'is not measuring' in stats[0]:
154                 continue
155             if stats[0].startswith('error'):
156                 RapidLog.critical("lat stats error: unexpected reply from PROX\
157                         (potential incompatibility between scripts and PROX)")
158                 raise Exception("lat stats error")
159             number_tasks_returning_stats += 1
160             result['lat_min'] = min(int(stats[0]),result['lat_min'])
161             result['lat_max'] = max(int(stats[1]),result['lat_max'])
162             result['lat_avg'] += int(stats[2])
163             #min_since begin = int(stats[3])
164             #max_since_begin = int(stats[4])
165             result['lat_tsc'] = int(stats[5])
166             # Taking the last tsc as the timestamp since
167             # PROX will return the same tsc for each
168             # core/task combination
169             result['lat_hz'] = int(stats[6])
170             #coreid = int(stats[7])
171             #taskid = int(stats[8])
172             result['mis_ordered'] += int(stats[9])
173             result['extent'] += int(stats[10])
174             result['duplicate'] += int(stats[11])
175             stats = self._recv().split(':')
176             if stats[0].startswith('error'):
177                 RapidLog.critical("lat stats error: unexpected lat bucket \
178                         reply (potential incompatibility between scripts \
179                         and PROX)")
180                 raise Exception("lat bucket reply error")
181             result['buckets'][0] = int(stats[1])
182             for i in range(1, 128):
183                 stats = self._recv().split(':')
184                 result['buckets'][i] += int(stats[1])
185         result['lat_avg'] = old_div(result['lat_avg'],
186                 number_tasks_returning_stats)
187         self._send('stats latency(0).used')
188         used = float(self._recv())
189         self._send('stats latency(0).total')
190         total = float(self._recv())
191         result['lat_used'] = old_div(used,total)
192         return (result)
193
194     def irq_stats(self, core, bucket, task=0):
195         self._send('stats task.core(%s).task(%s).irq(%s)' %
196                 (core, task, bucket))
197         stats = self._recv().split(',')
198         return int(stats[0])
199
200     def show_irq_buckets(self, core, task=0):
201         rx = tx = drop = tsc = hz = 0
202         self._send('show irq buckets %s %s' % (core,task))
203         buckets = self._recv().split(';')
204         buckets = buckets[:-1]
205         return buckets
206
207     def core_stats(self, cores, tasks=[0]):
208         rx = tx = drop = tsc = hz = rx_non_dp = tx_non_dp = tx_fail = 0
209         self._send('dp core stats %s %s' % (','.join(map(str, cores)),
210             ','.join(map(str, tasks))))
211         for core in cores:
212             for task in tasks:
213                 stats = self._recv().split(',')
214                 if stats[0].startswith('error'):
215                     if stats[0].startswith('error: invalid syntax'):
216                         RapidLog.critical("dp core stats error: unexpected \
217                                 invalid syntax (potential incompatibility \
218                                 between scripts and PROX)")
219                         raise Exception("dp core stats error")
220                     continue
221                 rx += int(stats[0])
222                 tx += int(stats[1])
223                 rx_non_dp += int(stats[2])
224                 tx_non_dp += int(stats[3])
225                 drop += int(stats[4])
226                 tx_fail += int(stats[5])
227                 tsc = int(stats[6])
228                 hz = int(stats[7])
229         return rx, rx_non_dp, tx, tx_non_dp, drop, tx_fail, tsc, hz
230
231     def multi_port_stats(self, ports=[0]):
232         rx = tx = port_id = tsc = no_mbufs = errors = 0
233         self._send('multi port stats %s' % (','.join(map(str, ports))))
234         result = self._recv().split(';')
235         if result[0].startswith('error'):
236             RapidLog.critical("multi port stats error: unexpected invalid \
237                     syntax (potential incompatibility between scripts and \
238                     PROX)")
239             raise Exception("multi port stats error")
240         for statistics in result:
241             stats = statistics.split(',')
242             port_id = int(stats[0])
243             rx += int(stats[1])
244             tx += int(stats[2])
245             no_mbufs += int(stats[3])
246             errors += int(stats[4])
247             tsc = int(stats[5])
248         return rx, tx, no_mbufs, errors, tsc
249
250     def set_random(self, cores, task, offset, mask, length):
251         self._send('set random %s %s %s %s %s' % (','.join(map(str, cores)),
252             task, offset, mask, length))
253
254     def set_size(self, cores, task, pkt_size):
255         self._send('pkt_size %s %s %s' % (','.join(map(str, cores)), task,
256             pkt_size))
257
258     def set_imix(self, cores, task, imix):
259         self._send('imix %s %s %s' % (','.join(map(str, cores)), task,
260             ','.join(map(str,imix))))
261
262     def set_value(self, cores, task, offset, value, length):
263         self._send('set value %s %s %s %s %s' % (','.join(map(str, cores)),
264             task, offset, value, length))
265
266     def quit_prox(self):
267         self._send('quit')
268
269     def _send(self, cmd):
270         """Append LF and send command to the PROX instance."""
271         if self._sock is None:
272             raise RuntimeError("PROX socket closed, cannot send '%s'" % cmd)
273         try:
274             self._sock.sendall(cmd.encode() + b'\n')
275         except ConnectionResetError as e:
276             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
277             raise
278
279     def _recv(self):
280         """Receive response from PROX instance, return it with LF removed."""
281         if self._sock is None:
282             raise RuntimeError("PROX socket closed, cannot receive anymore")
283         try:
284             pos = self._rcvd.find(b'\n')
285             while pos == -1:
286                 self._rcvd += self._sock.recv(256)
287                 pos = self._rcvd.find(b'\n')
288             rsp = self._rcvd[:pos]
289             self._rcvd = self._rcvd[pos+1:]
290         except ConnectionResetError as e:
291             RapidLog.error('Pipe reset by Prox instance: traffic too high?')
292             raise
293         return rsp.decode()