Upload the contribution of vstf as bottleneck network framework.
[bottlenecks.git] / vstf / vstf / controller / spirent / common / model.py
1 #!/usr/bin/python
2 """
3     @author: l00190809
4     @group: Huawei Ltd 
5 """
6 import re
7 import copy
8 import time
9 import ConfigParser
10
11 fwd = {'single': ['forward'],
12        'double': ['forward', 'reverse']
13        }
14 models = ['Tnv']
15 direction = ['single', 'double']
16 reverse_dict = {
17     'forward': 'reverse',
18     'reverse': 'forward'
19 }
20
21
22 class BaseModel(object):
23     def __init__(self, config):
24         self.config = config
25
26     def _check_model(self):
27         return self.config['model'] in models
28
29     def _check_virtenv(self):
30         try:
31             num = int(self.config['virtenv'])
32             return num in range(1, 9)
33         except:
34             print("[ERROR]The virtenv is not a inter number.")
35
36     def _check_queues(self):
37         try:
38             num = int(self.config['queues'])
39             return num in range(1, 9)
40         except:
41             print("[ERROR]The virt queues is not a inter number.")
42
43     @property
44     def _check_flows(self):
45         try:
46             num = int(self.config['flows'])
47             return num in range(1, 9)
48         except:
49             print("[ERROR]The flow is not a inter number.")
50
51     def _check_direct(self):
52         return self.config['direct'] in direction
53
54     def _check_vlans(self):
55         return self.config['vlans'] in ['True', 'False']
56
57     def _check_bind(self):
58         return True
59
60     def check_parameter_invalid(self):
61         try:
62             if self._check_model() and \
63                     self._check_virtenv() and \
64                     self._check_queues() and \
65                     self._check_flows and \
66                     self._check_direct() and \
67                     self._check_vlans() and \
68                     self._check_bind():
69                 return True
70             else:
71                 print("[ERROR]Paramter check invalid")
72                 return False
73         except:
74             print("[ERROR]Check parameter invalid with unknown reason.")
75             return False
76
77
78 def _get_array_values(irq_array):
79     proc_list = []
80     for i in range(len(irq_array)):
81         proc_list.append(irq_array[i][1])
82     return sorted(proc_list)
83
84
85 def check_dict(thread_info, flow):
86     if thread_info['src_recv_irq'] != flow['src_recv_irq']:
87         print("[WARN]Flow src_irq process %s not match %s in the table."
88               % (thread_info['src_recv_irq'],
89                  flow['src_recv_irq']))
90         return False
91     if thread_info['dst_send_irq'] != flow['dst_send_irq']:
92         print("[WARN]Flow dst_irq process %s not match %s in the table."
93               % (thread_info['dst_send_irq'],
94                  flow['dst_send_irq']))
95         return False
96     return True
97
98
99 def dst_ip_update(flow):
100     try:
101         src_dst_ip = flow['dst_ip']
102         ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.'
103         number = int(src_dst_ip.split('.')[3])
104         new_number = number + 1
105         new_dst_ip = ip_section + str(new_number)
106         flow['dst_ip'] = new_dst_ip
107     except:
108         print("[ERROR]dst ip update failed.")
109
110
111 def _tranfer_array_to_range(array):
112     return str(array[0]) + '-' + str(array[-1])
113
114
115 class TnV(BaseModel):
116     def __init__(self, config):
117         super(TnV, self).__init__(config)
118         self.config = config
119         self.host_instance = None
120         self.send_instace = None
121         self.vms = None
122         self.init_flows = {}
123         handle = ConfigParser.ConfigParser()
124         handle.read(self.config['configfile'])
125         self.handle = handle
126
127     def _get_vms(self):
128         return self.host_instance.get_libvirt_vms()
129
130     def flow_match(self):
131         _queues = int(self.config['queues'])
132         _virtenv = int(self.config['virtenv'])
133         _flows = int(self.config['flows'])
134         return _flows == _queues * _virtenv
135
136     def match_virt_env(self):
137         try:
138             self.vms = self._get_vms()
139             return len(self.vms) == int(self.config['virtenv'])
140         except:
141             print("[ERROR]vms or containers number is equal to virtenv.")
142             return False
143
144     @property
145     def match_flows_and_nic(self):
146         # get src_nic
147         for section in ['send', 'recv']:
148             nic = self._get_nic_from_file(section, 'nic')
149             try:
150                 irq_proc = self.host_instance.get_nic_interrupt_proc(nic)
151                 return int(self.config['flows']) == len(irq_proc)
152             except:
153                 print("[ERROR]match flow with nic interrupt failed.")
154                 return False
155
156     def _get_nic_irq_proc(self, nic):
157         return self.host_instance.get_nic_interrupt_proc(nic)
158
159     def _get_nic_from_file(self, section, column):
160         return self.handle.get(section, column)
161
162     def _get_range(self, section, column):
163         try:
164             info = self.handle.get(section, column)
165             return info.split(' ')
166         except:
167             print("[ERROR]Get mac failed.")
168             return False
169
170     def check_mac_valid(self):
171         flag = True
172         try:
173             for option in ['send', 'recv']:
174                 info = self.handle.get(option, 'macs')
175                 macs = info.split()
176                 if len(macs) != int(self.config['virtenv']) or macs == []:
177                     print("[ERROR]The macs number is not equal to vms or containers.")
178                     return False
179                 for mac in macs:
180                     # check mac valid
181                     if re.match(r'..:..:..:..:..:..', mac):
182                         continue
183                     else:
184                         print("[ERROR]mac %s invalid" % mac)
185                         flag = False
186                         break
187                 if not flag:
188                     break
189             return flag
190         except:
191             print("[ERROR]parse macs failed.")
192             return False
193
194     def check_vlan_valid(self):
195         flag = True
196         for direct in ['send', 'recv']:
197             vlans = self.handle.get(direct, 'vlans').split()
198             if len(vlans) != int(self.config['virtenv']):
199                 print("[ERROR]vlan un config")
200                 return False
201             for vlan in vlans:
202                 if int(vlan) <= 1 or int(vlan) >= 4095:
203                     flag = False
204                     break
205         return flag
206
207     @property
208     def check_logic_invalid(self):
209         return self.flow_match() and self.match_virt_env() and \
210                self.match_flows_and_nic and self.check_mac_valid() and \
211                self.check_vlan_valid()
212
213     @property
214     def read_flow_init(self):
215         # The 
216         temp_flow = {}
217         src_macs = self._get_range('send', 'macs')
218         dst_macs = self._get_range('recv', 'macs')
219         src_vlan = self._get_range('send', 'vlans')
220         dst_vlan = self._get_range('recv', 'vlans')
221         src_nic = self._get_nic_from_file('send', 'nic')
222         dst_nic = self._get_nic_from_file('recv', 'nic')
223         src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic))
224         dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic))
225         src_ip_sections = self._get_range('send', 'ip_sections')
226         dst_ip_sections = self._get_range('recv', 'ip_sections')
227         send_port = self._get_nic_from_file('send', 'port')
228         recv_port = self._get_nic_from_file('recv', 'port')
229         temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip')
230         vlan = src_vlan
231         avg_flow = int(self.config['flows']) / int(self.config['virtenv'])
232         # build the main dictionary 
233         for _direct in sorted(fwd[self.config['direct']]):
234             i = 0
235             j = 0
236             temp_flow['direct'] = _direct
237             temp_flow['send_port'] = send_port
238             temp_flow['recv_port'] = recv_port
239
240             for _vm in sorted(self.vms):
241                 vlan_id = {
242                     'True': vlan[i],
243                     'False': None}
244                 temp_flow['virt'] = _vm
245                 _vm_info = self.host_instance.get_vm_info(_vm)
246                 temp_flow['qemu_proc'] = _vm_info['main_pid']
247                 # temp_flow['qemu_thread']  = _vm_info['qemu_thread']
248                 temp_flow['mem_numa'] = _vm_info['mem_numa']
249                 # temp_flow['vhost_thread'] = _vm_info['vhost_thread']
250
251                 temp_flow['src_mac'] = src_macs[i]
252                 temp_flow['dst_mac'] = dst_macs[i]
253                 temp_flow['vlan'] = vlan_id[self.config['vlans']]
254                 src_ip = src_ip_sections[i]
255                 dst_ip = dst_ip_sections[i]
256                 temp_flow['src_ip'] = src_ip
257                 temp_flow['dst_ip'] = dst_ip
258                 vm_index = sorted(self.vms).index(_vm)
259                 for _queue in range(1, int(self.config['queues']) + 1):
260                     # flow info
261                     temp_flow['queue'] = _queue
262                     # fwd thread
263
264                     temp_flow['qemu_thread_list'] = _vm_info['qemu_thread']
265                     forward_core = {
266                         "forward": _vm_info['qemu_thread'][_queue + avg_flow * vm_index],
267                         "reverse": _vm_info['qemu_thread'][_queue + avg_flow * vm_index + int(self.config['flows'])]
268                     }
269                     temp_flow['fwd_thread'] = forward_core[_direct]
270
271                     temp_flow['fwd_vhost'] = None
272                     # nic interrupts info
273                     temp_flow['src_recv_irq'] = src_nic_irq[j]
274                     temp_flow['src_nic'] = src_nic
275                     temp_flow['dst_send_irq'] = dst_nic_irq[j]
276                     temp_flow['dst_nic'] = dst_nic
277                     # above all
278                     j += 1
279                     self.init_flows[_direct + '_' + _vm + '_' + str(_queue)] = copy.deepcopy(temp_flow)
280                 i += 1
281             src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq
282             vlan = dst_vlan
283             send_port, recv_port = recv_port, send_port
284             src_nic, dst_nic = dst_nic, src_nic
285             src_macs, dst_macs = dst_macs, src_macs
286             src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections
287         # return sorted(self.init_flows.iteritems(), key=lambda d:d[0])
288         return self.init_flows
289
290     def mac_learning(self, flowa, flowb):
291         flowa = str(flowa)
292         flowb = str(flowb)
293         ret = self.send_instace.mac_learning(flowa, flowb)
294         return ret
295
296     def send_packet(self, flow):
297         flow = str(flow)
298         # return a stream block handle
299         return self.send_instace.send_packet(flow)
300
301     def stop_flow(self, streamblock, flow):
302         flow = str(flow)
303         return self.send_instace.stop_flow(streamblock, flow)
304
305     def catch_thread_info(self):
306         return self.host_instance.catch_thread_info()
307
308     def set_thread2flow(self, thread_info, flow):
309         flow['fwd_vhost'] = thread_info['fwd_vhost']
310         return True
311
312     @property
313     def flow_build(self):
314         for _direct in fwd[self.config['direct']]:
315             for _vm in self.vms:
316                 for _queue in range(1, int(self.config['queues']) + 1):
317                     i = 0
318                     while i < 50:
319                         try:
320                             i += 1
321                             thread_info = None
322                             self.mac_learning(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)],
323                                               self.init_flows[reverse_dict[_direct] + '_' + _vm + '_' + str(_queue)])
324                             streamblock = self.send_packet(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
325                             time.sleep(1)
326                             result, thread_info = self.catch_thread_info()
327                             thread_info = eval(thread_info)
328                             self.stop_flow(streamblock, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
329                             time.sleep(1)
330                             if not result:
331                                 print("[ERROR]Catch the thread info failed.")
332                                 break
333                         except:
334                             print("[ERROR]send flow failed error or get host thread info failed.")
335
336                         # compare the got thread info to
337                         if check_dict(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]):
338                             self.set_thread2flow(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
339                             print("[INFO]Flow %s_%s_%s :     fwd_vhost %s    src_recv_irq %s   dst_send_irq %s"
340                                   % (_direct, _vm, _queue, thread_info['fwd_vhost'], thread_info['src_recv_irq'],
341                                      thread_info['dst_send_irq']))
342                             print("%s" % (self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]))
343                             break
344                         else:
345                             dst_ip_update(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
346         return self.init_flows
347
348     def affinity_bind(self, aff_strategy):
349         # get the forward cores
350         qemu_list = []
351         qemu_other = []
352         src_vhost = []
353         dst_vhost = []
354         src_irq = []
355         dst_irq = []
356
357         # recognize the thread id
358         for flowname in sorted(self.init_flows.keys()):
359             tmp_thread = self.init_flows[flowname]['fwd_thread']
360             qemu_other = qemu_other + copy.deepcopy(self.init_flows[flowname]['qemu_thread_list'])
361             qemu_list.append(tmp_thread)
362             if self.init_flows[flowname]['direct'] == 'forward':
363                 dst_vhost.append(self.init_flows[flowname]['fwd_vhost'])
364                 src_irq.append(self.init_flows[flowname]['src_recv_irq'])
365                 dst_irq.append(self.init_flows[flowname]['dst_send_irq'])
366             elif self.init_flows[flowname]['direct'] == 'reverse':
367                 src_vhost.append(self.init_flows[flowname]['fwd_vhost'])
368                 dst_irq.append(self.init_flows[flowname]['src_recv_irq'])
369                 src_irq.append(self.init_flows[flowname]['dst_send_irq'])
370
371         qemu_list = sorted({}.fromkeys(qemu_list).keys())
372         src_vhost = sorted({}.fromkeys(src_vhost).keys())
373         dst_vhost = sorted({}.fromkeys(dst_vhost).keys())
374         src_irq = sorted({}.fromkeys(src_irq).keys())
375         dst_irq = sorted({}.fromkeys(dst_irq).keys())
376
377         # get the qemu thread except the forward core
378         qemu_other = sorted({}.fromkeys(qemu_other).keys())
379         for i in qemu_list:
380             qemu_other.remove(i)
381         # get the bind strategy
382         handle = ConfigParser.ConfigParser()
383         handle.read(self.config['strategyfile'])
384         try:
385             qemu_numa = handle.get('strategy' + self.config['strategy'], 'qemu_numa')
386             src_vhost_numa = handle.get('strategy' + self.config['strategy'], 'src_vhost_numa')
387             dst_vhost_numa = handle.get('strategy' + self.config['strategy'], 'dst_vhost_numa')
388             src_irq_numa = handle.get('strategy' + self.config['strategy'], 'src_irq_numa')
389             dst_irq_numa = handle.get('strategy' + self.config['strategy'], 'dst_irq_numa')
390             loan_numa = handle.get('strategy' + self.config['strategy'], 'loan_numa')
391         except:
392             print("[ERROR]Parse the strategy file failed or get the options failed.")
393
394         for value in [qemu_numa, src_vhost_numa, dst_vhost_numa, src_irq_numa, dst_irq_numa, loan_numa]:
395             if value is not None or value == '':
396                 raise ValueError('some option in the strategy file is none.')
397         # cores mapping thread
398         numa_topo = self.host_instance.get_numa_core()
399         numa_topo = eval(numa_topo)
400         # first check the cores number
401
402         # order src_irq dst_irq src_vhost dst_vhost qemu_list
403         for node in numa_topo.keys():
404             numa_topo[node]['process'] = []
405             if 'node' + src_irq_numa == node:
406                 numa_topo[node]['process'] = numa_topo[node]['process'] + src_irq
407             if 'node' + dst_irq_numa == node:
408                 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_irq
409             if 'node' + src_vhost_numa == node:
410                 numa_topo[node]['process'] = numa_topo[node]['process'] + src_vhost
411             if 'node' + dst_vhost_numa == node:
412                 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_vhost
413             if 'node' + qemu_numa == node:
414                 numa_topo[node]['process'] = numa_topo[node]['process'] + qemu_list
415         loan_cores = ''
416         for node in numa_topo.keys():
417             if len(numa_topo[node]['process']) > len(numa_topo[node]['phy_cores']):
418                 # length distance
419                 diff = len(numa_topo[node]['process']) - len(numa_topo[node]['phy_cores'])
420                 # first deep copy
421                 numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa]['process'] + copy.deepcopy(
422                     numa_topo[node]['process'][-diff:])
423                 cores_str = _tranfer_array_to_range(numa_topo['node' + loan_numa]['phy_cores'][diff:])
424                 loan_cores = ','.join([loan_cores, cores_str])
425                 numa_topo[node]['process'] = numa_topo[node]['process'][0:-diff]
426         loan_cores = loan_cores[1:]
427         loan_bind_list = {}
428         for proc_loan in qemu_other:
429             loan_bind_list[proc_loan] = loan_cores
430
431         bind_list = {}
432         for node in numa_topo.keys():
433             for i in range(len(numa_topo[node]['process'])):
434                 bind_list[numa_topo[node]['process'][i]] = str(numa_topo[node]['phy_cores'][i])
435         bind_list.update(loan_bind_list)
436         for key in bind_list.keys():
437             self.host_instance.bind_cpu(bind_list[key], key)
438         print bind_list
439         return True
440
441     def testrun(self, suite):
442         global forward_init_flows, reverse_init_flows
443         try:
444             forward_init_flows = {}
445             reverse_init_flows = {}
446             for key in self.init_flows.keys():
447                 if self.init_flows[key]['direct'] == "forward":
448                     forward_init_flows[key] = self.init_flows[key]
449                 elif self.init_flows[key]['direct'] == "reverse":
450                     reverse_init_flows[key] = self.init_flows[key]
451             forward_init_flows = str(forward_init_flows)
452             reverse_init_flows = str(reverse_init_flows)
453         except:
454             print("[ERROR]init the forward and reverse flow failed.")
455
456         if suite == "throughput":
457             print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test")
458             ret, result = self.send_instace.run_rfc2544_throughput(forward_init_flows, reverse_init_flows)
459         elif suite == "frameloss":
460             print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test")
461             ret, result = self.send_instace.run_rfc2544_frameloss(forward_init_flows, reverse_init_flows)
462         return ret, result