JIRA: BOTTLENECKS-29
[bottlenecks.git] / vstf / vstf / controller / spirent / common / model.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import re
11 import copy
12 import time
13 import ConfigParser
14
15 fwd = {'single': ['forward'],
16        'double': ['forward', 'reverse']
17        }
18 models = ['Tnv']
19 direction = ['single', 'double']
20 reverse_dict = {
21     'forward': 'reverse',
22     'reverse': 'forward'
23 }
24
25
26 class BaseModel(object):
27     def __init__(self, config):
28         self.config = config
29
30     def _check_model(self):
31         return self.config['model'] in models
32
33     def _check_virtenv(self):
34         try:
35             num = int(self.config['virtenv'])
36             return num in range(1, 9)
37         except:
38             print("[ERROR]The virtenv is not a inter number.")
39
40     def _check_queues(self):
41         try:
42             num = int(self.config['queues'])
43             return num in range(1, 9)
44         except:
45             print("[ERROR]The virt queues is not a inter number.")
46
47     @property
48     def _check_flows(self):
49         try:
50             num = int(self.config['flows'])
51             return num in range(1, 9)
52         except:
53             print("[ERROR]The flow is not a inter number.")
54
55     def _check_direct(self):
56         return self.config['direct'] in direction
57
58     def _check_vlans(self):
59         return self.config['vlans'] in ['True', 'False']
60
61     def _check_bind(self):
62         return True
63
64     def check_parameter_invalid(self):
65         try:
66             if self._check_model() and \
67                     self._check_virtenv() and \
68                     self._check_queues() and \
69                     self._check_flows and \
70                     self._check_direct() and \
71                     self._check_vlans() and \
72                     self._check_bind():
73                 return True
74             else:
75                 print("[ERROR]Paramter check invalid")
76                 return False
77         except:
78             print("[ERROR]Check parameter invalid with unknown reason.")
79             return False
80
81
82 def _get_array_values(irq_array):
83     proc_list = []
84     for i in range(len(irq_array)):
85         proc_list.append(irq_array[i][1])
86     return sorted(proc_list)
87
88
89 def check_dict(thread_info, flow):
90     if thread_info['src_recv_irq'] != flow['src_recv_irq']:
91         print("[WARN]Flow src_irq process %s not match %s in the table."
92               % (thread_info['src_recv_irq'],
93                  flow['src_recv_irq']))
94         return False
95     if thread_info['dst_send_irq'] != flow['dst_send_irq']:
96         print("[WARN]Flow dst_irq process %s not match %s in the table."
97               % (thread_info['dst_send_irq'],
98                  flow['dst_send_irq']))
99         return False
100     return True
101
102
103 def dst_ip_update(flow):
104     try:
105         src_dst_ip = flow['dst_ip']
106         ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.'
107         number = int(src_dst_ip.split('.')[3])
108         new_number = number + 1
109         new_dst_ip = ip_section + str(new_number)
110         flow['dst_ip'] = new_dst_ip
111     except:
112         print("[ERROR]dst ip update failed.")
113
114
115 def _tranfer_array_to_range(array):
116     return str(array[0]) + '-' + str(array[-1])
117
118
119 class TnV(BaseModel):
120     def __init__(self, config):
121         super(TnV, self).__init__(config)
122         self.config = config
123         self.host_instance = None
124         self.send_instace = None
125         self.vms = None
126         self.init_flows = {}
127         handle = ConfigParser.ConfigParser()
128         handle.read(self.config['configfile'])
129         self.handle = handle
130
131     def _get_vms(self):
132         return self.host_instance.get_libvirt_vms()
133
134     def flow_match(self):
135         _queues = int(self.config['queues'])
136         _virtenv = int(self.config['virtenv'])
137         _flows = int(self.config['flows'])
138         return _flows == _queues * _virtenv
139
140     def match_virt_env(self):
141         try:
142             self.vms = self._get_vms()
143             return len(self.vms) == int(self.config['virtenv'])
144         except:
145             print("[ERROR]vms or containers number is equal to virtenv.")
146             return False
147
148     @property
149     def match_flows_and_nic(self):
150         # get src_nic
151         for section in ['send', 'recv']:
152             nic = self._get_nic_from_file(section, 'nic')
153             try:
154                 irq_proc = self.host_instance.get_nic_interrupt_proc(nic)
155                 return int(self.config['flows']) == len(irq_proc)
156             except:
157                 print("[ERROR]match flow with nic interrupt failed.")
158                 return False
159
160     def _get_nic_irq_proc(self, nic):
161         return self.host_instance.get_nic_interrupt_proc(nic)
162
163     def _get_nic_from_file(self, section, column):
164         return self.handle.get(section, column)
165
166     def _get_range(self, section, column):
167         try:
168             info = self.handle.get(section, column)
169             return info.split(' ')
170         except:
171             print("[ERROR]Get mac failed.")
172             return False
173
174     def check_mac_valid(self):
175         flag = True
176         try:
177             for option in ['send', 'recv']:
178                 info = self.handle.get(option, 'macs')
179                 macs = info.split()
180                 if len(macs) != int(self.config['virtenv']) or macs == []:
181                     print("[ERROR]The macs number is not equal to vms or containers.")
182                     return False
183                 for mac in macs:
184                     # check mac valid
185                     if re.match(r'..:..:..:..:..:..', mac):
186                         continue
187                     else:
188                         print("[ERROR]mac %s invalid" % mac)
189                         flag = False
190                         break
191                 if not flag:
192                     break
193             return flag
194         except:
195             print("[ERROR]parse macs failed.")
196             return False
197
198     def check_vlan_valid(self):
199         flag = True
200         for direct in ['send', 'recv']:
201             vlans = self.handle.get(direct, 'vlans').split()
202             if len(vlans) != int(self.config['virtenv']):
203                 print("[ERROR]vlan un config")
204                 return False
205             for vlan in vlans:
206                 if int(vlan) <= 1 or int(vlan) >= 4095:
207                     flag = False
208                     break
209         return flag
210
211     @property
212     def check_logic_invalid(self):
213         return self.flow_match() and self.match_virt_env() and \
214                self.match_flows_and_nic and self.check_mac_valid() and \
215                self.check_vlan_valid()
216
217     @property
218     def read_flow_init(self):
219         # The 
220         temp_flow = {}
221         src_macs = self._get_range('send', 'macs')
222         dst_macs = self._get_range('recv', 'macs')
223         src_vlan = self._get_range('send', 'vlans')
224         dst_vlan = self._get_range('recv', 'vlans')
225         src_nic = self._get_nic_from_file('send', 'nic')
226         dst_nic = self._get_nic_from_file('recv', 'nic')
227         src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic))
228         dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic))
229         src_ip_sections = self._get_range('send', 'ip_sections')
230         dst_ip_sections = self._get_range('recv', 'ip_sections')
231         send_port = self._get_nic_from_file('send', 'port')
232         recv_port = self._get_nic_from_file('recv', 'port')
233         temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip')
234         vlan = src_vlan
235         avg_flow = int(self.config['flows']) / int(self.config['virtenv'])
236         # build the main dictionary 
237         for _direct in sorted(fwd[self.config['direct']]):
238             i = 0
239             j = 0
240             temp_flow['direct'] = _direct
241             temp_flow['send_port'] = send_port
242             temp_flow['recv_port'] = recv_port
243
244             for _vm in sorted(self.vms):
245                 vlan_id = {
246                     'True': vlan[i],
247                     'False': None}
248                 temp_flow['virt'] = _vm
249                 _vm_info = self.host_instance.get_vm_info(_vm)
250                 temp_flow['qemu_proc'] = _vm_info['main_pid']
251                 # temp_flow['qemu_thread']  = _vm_info['qemu_thread']
252                 temp_flow['mem_numa'] = _vm_info['mem_numa']
253                 # temp_flow['vhost_thread'] = _vm_info['vhost_thread']
254
255                 temp_flow['src_mac'] = src_macs[i]
256                 temp_flow['dst_mac'] = dst_macs[i]
257                 temp_flow['vlan'] = vlan_id[self.config['vlans']]
258                 src_ip = src_ip_sections[i]
259                 dst_ip = dst_ip_sections[i]
260                 temp_flow['src_ip'] = src_ip
261                 temp_flow['dst_ip'] = dst_ip
262                 vm_index = sorted(self.vms).index(_vm)
263                 for _queue in range(1, int(self.config['queues']) + 1):
264                     # flow info
265                     temp_flow['queue'] = _queue
266                     # fwd thread
267
268                     temp_flow['qemu_thread_list'] = _vm_info['qemu_thread']
269                     forward_core = {
270                         "forward": _vm_info['qemu_thread'][_queue + avg_flow * vm_index],
271                         "reverse": _vm_info['qemu_thread'][_queue + avg_flow * vm_index + int(self.config['flows'])]
272                     }
273                     temp_flow['fwd_thread'] = forward_core[_direct]
274
275                     temp_flow['fwd_vhost'] = None
276                     # nic interrupts info
277                     temp_flow['src_recv_irq'] = src_nic_irq[j]
278                     temp_flow['src_nic'] = src_nic
279                     temp_flow['dst_send_irq'] = dst_nic_irq[j]
280                     temp_flow['dst_nic'] = dst_nic
281                     # above all
282                     j += 1
283                     self.init_flows[_direct + '_' + _vm + '_' + str(_queue)] = copy.deepcopy(temp_flow)
284                 i += 1
285             src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq
286             vlan = dst_vlan
287             send_port, recv_port = recv_port, send_port
288             src_nic, dst_nic = dst_nic, src_nic
289             src_macs, dst_macs = dst_macs, src_macs
290             src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections
291         # return sorted(self.init_flows.iteritems(), key=lambda d:d[0])
292         return self.init_flows
293
294     def mac_learning(self, flowa, flowb):
295         flowa = str(flowa)
296         flowb = str(flowb)
297         ret = self.send_instace.mac_learning(flowa, flowb)
298         return ret
299
300     def send_packet(self, flow):
301         flow = str(flow)
302         # return a stream block handle
303         return self.send_instace.send_packet(flow)
304
305     def stop_flow(self, streamblock, flow):
306         flow = str(flow)
307         return self.send_instace.stop_flow(streamblock, flow)
308
309     def catch_thread_info(self):
310         return self.host_instance.catch_thread_info()
311
312     def set_thread2flow(self, thread_info, flow):
313         flow['fwd_vhost'] = thread_info['fwd_vhost']
314         return True
315
316     @property
317     def flow_build(self):
318         for _direct in fwd[self.config['direct']]:
319             for _vm in self.vms:
320                 for _queue in range(1, int(self.config['queues']) + 1):
321                     i = 0
322                     while i < 50:
323                         try:
324                             i += 1
325                             thread_info = None
326                             self.mac_learning(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)],
327                                               self.init_flows[reverse_dict[_direct] + '_' + _vm + '_' + str(_queue)])
328                             streamblock = self.send_packet(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
329                             time.sleep(1)
330                             result, thread_info = self.catch_thread_info()
331                             thread_info = eval(thread_info)
332                             self.stop_flow(streamblock, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
333                             time.sleep(1)
334                             if not result:
335                                 print("[ERROR]Catch the thread info failed.")
336                                 break
337                         except:
338                             print("[ERROR]send flow failed error or get host thread info failed.")
339
340                         # compare the got thread info to
341                         if check_dict(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]):
342                             self.set_thread2flow(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
343                             print("[INFO]Flow %s_%s_%s :     fwd_vhost %s    src_recv_irq %s   dst_send_irq %s"
344                                   % (_direct, _vm, _queue, thread_info['fwd_vhost'], thread_info['src_recv_irq'],
345                                      thread_info['dst_send_irq']))
346                             print("%s" % (self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]))
347                             break
348                         else:
349                             dst_ip_update(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
350         return self.init_flows
351
352     def affinity_bind(self, aff_strategy):
353         # get the forward cores
354         qemu_list = []
355         qemu_other = []
356         src_vhost = []
357         dst_vhost = []
358         src_irq = []
359         dst_irq = []
360
361         # recognize the thread id
362         for flowname in sorted(self.init_flows.keys()):
363             tmp_thread = self.init_flows[flowname]['fwd_thread']
364             qemu_other = qemu_other + copy.deepcopy(self.init_flows[flowname]['qemu_thread_list'])
365             qemu_list.append(tmp_thread)
366             if self.init_flows[flowname]['direct'] == 'forward':
367                 dst_vhost.append(self.init_flows[flowname]['fwd_vhost'])
368                 src_irq.append(self.init_flows[flowname]['src_recv_irq'])
369                 dst_irq.append(self.init_flows[flowname]['dst_send_irq'])
370             elif self.init_flows[flowname]['direct'] == 'reverse':
371                 src_vhost.append(self.init_flows[flowname]['fwd_vhost'])
372                 dst_irq.append(self.init_flows[flowname]['src_recv_irq'])
373                 src_irq.append(self.init_flows[flowname]['dst_send_irq'])
374
375         qemu_list = sorted({}.fromkeys(qemu_list).keys())
376         src_vhost = sorted({}.fromkeys(src_vhost).keys())
377         dst_vhost = sorted({}.fromkeys(dst_vhost).keys())
378         src_irq = sorted({}.fromkeys(src_irq).keys())
379         dst_irq = sorted({}.fromkeys(dst_irq).keys())
380
381         # get the qemu thread except the forward core
382         qemu_other = sorted({}.fromkeys(qemu_other).keys())
383         for i in qemu_list:
384             qemu_other.remove(i)
385         # get the bind strategy
386         handle = ConfigParser.ConfigParser()
387         handle.read(self.config['strategyfile'])
388         try:
389             qemu_numa = handle.get('strategy' + self.config['strategy'], 'qemu_numa')
390             src_vhost_numa = handle.get('strategy' + self.config['strategy'], 'src_vhost_numa')
391             dst_vhost_numa = handle.get('strategy' + self.config['strategy'], 'dst_vhost_numa')
392             src_irq_numa = handle.get('strategy' + self.config['strategy'], 'src_irq_numa')
393             dst_irq_numa = handle.get('strategy' + self.config['strategy'], 'dst_irq_numa')
394             loan_numa = handle.get('strategy' + self.config['strategy'], 'loan_numa')
395         except:
396             print("[ERROR]Parse the strategy file failed or get the options failed.")
397
398         for value in [qemu_numa, src_vhost_numa, dst_vhost_numa, src_irq_numa, dst_irq_numa, loan_numa]:
399             if value is not None or value == '':
400                 raise ValueError('some option in the strategy file is none.')
401         # cores mapping thread
402         numa_topo = self.host_instance.get_numa_core()
403         numa_topo = eval(numa_topo)
404         # first check the cores number
405
406         # order src_irq dst_irq src_vhost dst_vhost qemu_list
407         for node in numa_topo.keys():
408             numa_topo[node]['process'] = []
409             if 'node' + src_irq_numa == node:
410                 numa_topo[node]['process'] = numa_topo[node]['process'] + src_irq
411             if 'node' + dst_irq_numa == node:
412                 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_irq
413             if 'node' + src_vhost_numa == node:
414                 numa_topo[node]['process'] = numa_topo[node]['process'] + src_vhost
415             if 'node' + dst_vhost_numa == node:
416                 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_vhost
417             if 'node' + qemu_numa == node:
418                 numa_topo[node]['process'] = numa_topo[node]['process'] + qemu_list
419         loan_cores = ''
420         for node in numa_topo.keys():
421             if len(numa_topo[node]['process']) > len(numa_topo[node]['phy_cores']):
422                 # length distance
423                 diff = len(numa_topo[node]['process']) - len(numa_topo[node]['phy_cores'])
424                 # first deep copy
425                 numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa]['process'] + copy.deepcopy(
426                     numa_topo[node]['process'][-diff:])
427                 cores_str = _tranfer_array_to_range(numa_topo['node' + loan_numa]['phy_cores'][diff:])
428                 loan_cores = ','.join([loan_cores, cores_str])
429                 numa_topo[node]['process'] = numa_topo[node]['process'][0:-diff]
430         loan_cores = loan_cores[1:]
431         loan_bind_list = {}
432         for proc_loan in qemu_other:
433             loan_bind_list[proc_loan] = loan_cores
434
435         bind_list = {}
436         for node in numa_topo.keys():
437             for i in range(len(numa_topo[node]['process'])):
438                 bind_list[numa_topo[node]['process'][i]] = str(numa_topo[node]['phy_cores'][i])
439         bind_list.update(loan_bind_list)
440         for key in bind_list.keys():
441             self.host_instance.bind_cpu(bind_list[key], key)
442         print bind_list
443         return True
444
445     def testrun(self, suite):
446         global forward_init_flows, reverse_init_flows
447         try:
448             forward_init_flows = {}
449             reverse_init_flows = {}
450             for key in self.init_flows.keys():
451                 if self.init_flows[key]['direct'] == "forward":
452                     forward_init_flows[key] = self.init_flows[key]
453                 elif self.init_flows[key]['direct'] == "reverse":
454                     reverse_init_flows[key] = self.init_flows[key]
455             forward_init_flows = str(forward_init_flows)
456             reverse_init_flows = str(reverse_init_flows)
457         except:
458             print("[ERROR]init the forward and reverse flow failed.")
459
460         if suite == "throughput":
461             print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test")
462             ret, result = self.send_instace.run_rfc2544_throughput(forward_init_flows, reverse_init_flows)
463         elif suite == "frameloss":
464             print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test")
465             ret, result = self.send_instace.run_rfc2544_frameloss(forward_init_flows, reverse_init_flows)
466         return ret, result