1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
15 fwd = {'single': ['forward'],
16 'double': ['forward', 'reverse']
19 direction = ['single', 'double']
26 class BaseModel(object):
28 def __init__(self, config):
31 def _check_model(self):
32 return self.config['model'] in models
34 def _check_virtenv(self):
36 num = int(self.config['virtenv'])
37 return num in range(1, 9)
39 print("[ERROR]The virtenv is not a inter number.")
41 def _check_queues(self):
43 num = int(self.config['queues'])
44 return num in range(1, 9)
46 print("[ERROR]The virt queues is not a inter number.")
49 def _check_flows(self):
51 num = int(self.config['flows'])
52 return num in range(1, 9)
54 print("[ERROR]The flow is not a inter number.")
56 def _check_direct(self):
57 return self.config['direct'] in direction
59 def _check_vlans(self):
60 return self.config['vlans'] in ['True', 'False']
62 def _check_bind(self):
65 def check_parameter_invalid(self):
67 if self._check_model() and \
68 self._check_virtenv() and \
69 self._check_queues() and \
70 self._check_flows and \
71 self._check_direct() and \
72 self._check_vlans() and \
76 print("[ERROR]Paramter check invalid")
79 print("[ERROR]Check parameter invalid with unknown reason.")
83 def _get_array_values(irq_array):
85 for i in range(len(irq_array)):
86 proc_list.append(irq_array[i][1])
87 return sorted(proc_list)
90 def check_dict(thread_info, flow):
91 if thread_info['src_recv_irq'] != flow['src_recv_irq']:
92 print("[WARN]Flow src_irq process %s not match %s in the table."
93 % (thread_info['src_recv_irq'],
94 flow['src_recv_irq']))
96 if thread_info['dst_send_irq'] != flow['dst_send_irq']:
97 print("[WARN]Flow dst_irq process %s not match %s in the table."
98 % (thread_info['dst_send_irq'],
99 flow['dst_send_irq']))
104 def dst_ip_update(flow):
106 src_dst_ip = flow['dst_ip']
107 ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.'
108 number = int(src_dst_ip.split('.')[3])
109 new_number = number + 1
110 new_dst_ip = ip_section + str(new_number)
111 flow['dst_ip'] = new_dst_ip
113 print("[ERROR]dst ip update failed.")
116 def _tranfer_array_to_range(array):
117 return str(array[0]) + '-' + str(array[-1])
120 class TnV(BaseModel):
122 def __init__(self, config):
123 super(TnV, self).__init__(config)
125 self.host_instance = None
126 self.send_instace = None
129 handle = ConfigParser.ConfigParser()
130 handle.read(self.config['configfile'])
134 return self.host_instance.get_libvirt_vms()
136 def flow_match(self):
137 _queues = int(self.config['queues'])
138 _virtenv = int(self.config['virtenv'])
139 _flows = int(self.config['flows'])
140 return _flows == _queues * _virtenv
142 def match_virt_env(self):
144 self.vms = self._get_vms()
145 return len(self.vms) == int(self.config['virtenv'])
147 print("[ERROR]vms or containers number is equal to virtenv.")
151 def match_flows_and_nic(self):
153 for section in ['send', 'recv']:
154 nic = self._get_nic_from_file(section, 'nic')
156 irq_proc = self.host_instance.get_nic_interrupt_proc(nic)
157 return int(self.config['flows']) == len(irq_proc)
159 print("[ERROR]match flow with nic interrupt failed.")
162 def _get_nic_irq_proc(self, nic):
163 return self.host_instance.get_nic_interrupt_proc(nic)
165 def _get_nic_from_file(self, section, column):
166 return self.handle.get(section, column)
168 def _get_range(self, section, column):
170 info = self.handle.get(section, column)
171 return info.split(' ')
173 print("[ERROR]Get mac failed.")
176 def check_mac_valid(self):
179 for option in ['send', 'recv']:
180 info = self.handle.get(option, 'macs')
182 if len(macs) != int(self.config['virtenv']) or macs == []:
184 "[ERROR]The macs number is not equal to vms or containers.")
188 if re.match(r'..:..:..:..:..:..', mac):
191 print("[ERROR]mac %s invalid" % mac)
198 print("[ERROR]parse macs failed.")
201 def check_vlan_valid(self):
203 for direct in ['send', 'recv']:
204 vlans = self.handle.get(direct, 'vlans').split()
205 if len(vlans) != int(self.config['virtenv']):
206 print("[ERROR]vlan un config")
209 if int(vlan) <= 1 or int(vlan) >= 4095:
215 def check_logic_invalid(self):
216 return self.flow_match() and self.match_virt_env() and \
217 self.match_flows_and_nic and self.check_mac_valid() and \
218 self.check_vlan_valid()
221 def read_flow_init(self):
224 src_macs = self._get_range('send', 'macs')
225 dst_macs = self._get_range('recv', 'macs')
226 src_vlan = self._get_range('send', 'vlans')
227 dst_vlan = self._get_range('recv', 'vlans')
228 src_nic = self._get_nic_from_file('send', 'nic')
229 dst_nic = self._get_nic_from_file('recv', 'nic')
230 src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic))
231 dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic))
232 src_ip_sections = self._get_range('send', 'ip_sections')
233 dst_ip_sections = self._get_range('recv', 'ip_sections')
234 send_port = self._get_nic_from_file('send', 'port')
235 recv_port = self._get_nic_from_file('recv', 'port')
236 temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip')
238 avg_flow = int(self.config['flows']) / int(self.config['virtenv'])
239 # build the main dictionary
240 for _direct in sorted(fwd[self.config['direct']]):
243 temp_flow['direct'] = _direct
244 temp_flow['send_port'] = send_port
245 temp_flow['recv_port'] = recv_port
247 for _vm in sorted(self.vms):
251 temp_flow['virt'] = _vm
252 _vm_info = self.host_instance.get_vm_info(_vm)
253 temp_flow['qemu_proc'] = _vm_info['main_pid']
254 # temp_flow['qemu_thread'] = _vm_info['qemu_thread']
255 temp_flow['mem_numa'] = _vm_info['mem_numa']
256 # temp_flow['vhost_thread'] = _vm_info['vhost_thread']
258 temp_flow['src_mac'] = src_macs[i]
259 temp_flow['dst_mac'] = dst_macs[i]
260 temp_flow['vlan'] = vlan_id[self.config['vlans']]
261 src_ip = src_ip_sections[i]
262 dst_ip = dst_ip_sections[i]
263 temp_flow['src_ip'] = src_ip
264 temp_flow['dst_ip'] = dst_ip
265 vm_index = sorted(self.vms).index(_vm)
266 for _queue in range(1, int(self.config['queues']) + 1):
268 temp_flow['queue'] = _queue
271 temp_flow['qemu_thread_list'] = _vm_info['qemu_thread']
273 "forward": _vm_info['qemu_thread'][
274 _queue + avg_flow * vm_index],
275 "reverse": _vm_info['qemu_thread'][
276 _queue + avg_flow * vm_index + int(
277 self.config['flows'])]}
278 temp_flow['fwd_thread'] = forward_core[_direct]
280 temp_flow['fwd_vhost'] = None
281 # nic interrupts info
282 temp_flow['src_recv_irq'] = src_nic_irq[j]
283 temp_flow['src_nic'] = src_nic
284 temp_flow['dst_send_irq'] = dst_nic_irq[j]
285 temp_flow['dst_nic'] = dst_nic
288 self.init_flows[_direct + '_' + _vm + '_' +
289 str(_queue)] = copy.deepcopy(temp_flow)
291 src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq
293 send_port, recv_port = recv_port, send_port
294 src_nic, dst_nic = dst_nic, src_nic
295 src_macs, dst_macs = dst_macs, src_macs
296 src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections
297 # return sorted(self.init_flows.iteritems(), key=lambda d:d[0])
298 return self.init_flows
300 def mac_learning(self, flowa, flowb):
303 ret = self.send_instace.mac_learning(flowa, flowb)
306 def send_packet(self, flow):
308 # return a stream block handle
309 return self.send_instace.send_packet(flow)
311 def stop_flow(self, streamblock, flow):
313 return self.send_instace.stop_flow(streamblock, flow)
315 def catch_thread_info(self):
316 return self.host_instance.catch_thread_info()
318 def set_thread2flow(self, thread_info, flow):
319 flow['fwd_vhost'] = thread_info['fwd_vhost']
323 def flow_build(self):
324 for _direct in fwd[self.config['direct']]:
326 for _queue in range(1, int(self.config['queues']) + 1):
340 reverse_dict[_direct] +
345 streamblock = self.send_packet(
346 self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
348 result, thread_info = self.catch_thread_info()
349 thread_info = eval(thread_info)
351 streamblock, self.init_flows[
352 _direct + '_' + _vm + '_' + str(_queue)])
355 print("[ERROR]Catch the thread info failed.")
359 "[ERROR]send flow failed error or get host thread info failed.")
361 # compare the got thread info to
363 thread_info, self.init_flows[
364 _direct + '_' + _vm + '_' + str(_queue)]):
365 self.set_thread2flow(
366 thread_info, self.init_flows[
367 _direct + '_' + _vm + '_' + str(_queue)])
369 "[INFO]Flow %s_%s_%s : fwd_vhost %s src_recv_irq %s dst_send_irq %s" %
373 thread_info['fwd_vhost'],
374 thread_info['src_recv_irq'],
375 thread_info['dst_send_irq']))
393 return self.init_flows
395 def affinity_bind(self, aff_strategy):
396 # get the forward cores
404 # recognize the thread id
405 for flowname in sorted(self.init_flows.keys()):
406 tmp_thread = self.init_flows[flowname]['fwd_thread']
407 qemu_other = qemu_other + \
408 copy.deepcopy(self.init_flows[flowname]['qemu_thread_list'])
409 qemu_list.append(tmp_thread)
410 if self.init_flows[flowname]['direct'] == 'forward':
411 dst_vhost.append(self.init_flows[flowname]['fwd_vhost'])
412 src_irq.append(self.init_flows[flowname]['src_recv_irq'])
413 dst_irq.append(self.init_flows[flowname]['dst_send_irq'])
414 elif self.init_flows[flowname]['direct'] == 'reverse':
415 src_vhost.append(self.init_flows[flowname]['fwd_vhost'])
416 dst_irq.append(self.init_flows[flowname]['src_recv_irq'])
417 src_irq.append(self.init_flows[flowname]['dst_send_irq'])
419 qemu_list = sorted({}.fromkeys(qemu_list).keys())
420 src_vhost = sorted({}.fromkeys(src_vhost).keys())
421 dst_vhost = sorted({}.fromkeys(dst_vhost).keys())
422 src_irq = sorted({}.fromkeys(src_irq).keys())
423 dst_irq = sorted({}.fromkeys(dst_irq).keys())
425 # get the qemu thread except the forward core
426 qemu_other = sorted({}.fromkeys(qemu_other).keys())
429 # get the bind strategy
430 handle = ConfigParser.ConfigParser()
431 handle.read(self.config['strategyfile'])
433 qemu_numa = handle.get(
435 self.config['strategy'],
437 src_vhost_numa = handle.get(
438 'strategy' + self.config['strategy'],
440 dst_vhost_numa = handle.get(
441 'strategy' + self.config['strategy'],
443 src_irq_numa = handle.get(
445 self.config['strategy'],
447 dst_irq_numa = handle.get(
449 self.config['strategy'],
451 loan_numa = handle.get(
453 self.config['strategy'],
456 print("[ERROR]Parse the strategy file failed or get the options failed.")
465 if value is not None or value == '':
466 raise ValueError('some option in the strategy file is none.')
467 # cores mapping thread
468 numa_topo = self.host_instance.get_numa_core()
469 numa_topo = eval(numa_topo)
470 # first check the cores number
472 # order src_irq dst_irq src_vhost dst_vhost qemu_list
473 for node in numa_topo.keys():
474 numa_topo[node]['process'] = []
475 if 'node' + src_irq_numa == node:
476 numa_topo[node]['process'] = numa_topo[
477 node]['process'] + src_irq
478 if 'node' + dst_irq_numa == node:
479 numa_topo[node]['process'] = numa_topo[
480 node]['process'] + dst_irq
481 if 'node' + src_vhost_numa == node:
482 numa_topo[node]['process'] = numa_topo[
483 node]['process'] + src_vhost
484 if 'node' + dst_vhost_numa == node:
485 numa_topo[node]['process'] = numa_topo[
486 node]['process'] + dst_vhost
487 if 'node' + qemu_numa == node:
488 numa_topo[node]['process'] = numa_topo[
489 node]['process'] + qemu_list
491 for node in numa_topo.keys():
493 numa_topo[node]['process']) > len(
494 numa_topo[node]['phy_cores']):
496 diff = len(numa_topo[node]['process']) - \
497 len(numa_topo[node]['phy_cores'])
499 numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa][
500 'process'] + copy.deepcopy(numa_topo[node]['process'][-diff:])
501 cores_str = _tranfer_array_to_range(
504 loan_numa]['phy_cores'][
506 loan_cores = ','.join([loan_cores, cores_str])
507 numa_topo[node]['process'] = numa_topo[
508 node]['process'][0:-diff]
509 loan_cores = loan_cores[1:]
511 for proc_loan in qemu_other:
512 loan_bind_list[proc_loan] = loan_cores
515 for node in numa_topo.keys():
516 for i in range(len(numa_topo[node]['process'])):
517 bind_list[numa_topo[node]['process'][i]] = str(
518 numa_topo[node]['phy_cores'][i])
519 bind_list.update(loan_bind_list)
520 for key in bind_list.keys():
521 self.host_instance.bind_cpu(bind_list[key], key)
525 def testrun(self, suite):
526 global forward_init_flows, reverse_init_flows
528 forward_init_flows = {}
529 reverse_init_flows = {}
530 for key in self.init_flows.keys():
531 if self.init_flows[key]['direct'] == "forward":
532 forward_init_flows[key] = self.init_flows[key]
533 elif self.init_flows[key]['direct'] == "reverse":
534 reverse_init_flows[key] = self.init_flows[key]
535 forward_init_flows = str(forward_init_flows)
536 reverse_init_flows = str(reverse_init_flows)
538 print("[ERROR]init the forward and reverse flow failed.")
540 if suite == "throughput":
541 print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test")
542 ret, result = self.send_instace.run_rfc2544_throughput(
543 forward_init_flows, reverse_init_flows)
544 elif suite == "frameloss":
545 print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test")
546 ret, result = self.send_instace.run_rfc2544_frameloss(
547 forward_init_flows, reverse_init_flows)