1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
15 fwd = {'single': ['forward'],
16 'double': ['forward', 'reverse']
19 direction = ['single', 'double']
26 class BaseModel(object):
27 def __init__(self, config):
30 def _check_model(self):
31 return self.config['model'] in models
33 def _check_virtenv(self):
35 num = int(self.config['virtenv'])
36 return num in range(1, 9)
38 print("[ERROR]The virtenv is not a inter number.")
40 def _check_queues(self):
42 num = int(self.config['queues'])
43 return num in range(1, 9)
45 print("[ERROR]The virt queues is not a inter number.")
48 def _check_flows(self):
50 num = int(self.config['flows'])
51 return num in range(1, 9)
53 print("[ERROR]The flow is not a inter number.")
55 def _check_direct(self):
56 return self.config['direct'] in direction
58 def _check_vlans(self):
59 return self.config['vlans'] in ['True', 'False']
61 def _check_bind(self):
64 def check_parameter_invalid(self):
66 if self._check_model() and \
67 self._check_virtenv() and \
68 self._check_queues() and \
69 self._check_flows and \
70 self._check_direct() and \
71 self._check_vlans() and \
75 print("[ERROR]Paramter check invalid")
78 print("[ERROR]Check parameter invalid with unknown reason.")
82 def _get_array_values(irq_array):
84 for i in range(len(irq_array)):
85 proc_list.append(irq_array[i][1])
86 return sorted(proc_list)
89 def check_dict(thread_info, flow):
90 if thread_info['src_recv_irq'] != flow['src_recv_irq']:
91 print("[WARN]Flow src_irq process %s not match %s in the table."
92 % (thread_info['src_recv_irq'],
93 flow['src_recv_irq']))
95 if thread_info['dst_send_irq'] != flow['dst_send_irq']:
96 print("[WARN]Flow dst_irq process %s not match %s in the table."
97 % (thread_info['dst_send_irq'],
98 flow['dst_send_irq']))
103 def dst_ip_update(flow):
105 src_dst_ip = flow['dst_ip']
106 ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.'
107 number = int(src_dst_ip.split('.')[3])
108 new_number = number + 1
109 new_dst_ip = ip_section + str(new_number)
110 flow['dst_ip'] = new_dst_ip
112 print("[ERROR]dst ip update failed.")
115 def _tranfer_array_to_range(array):
116 return str(array[0]) + '-' + str(array[-1])
119 class TnV(BaseModel):
120 def __init__(self, config):
121 super(TnV, self).__init__(config)
123 self.host_instance = None
124 self.send_instace = None
127 handle = ConfigParser.ConfigParser()
128 handle.read(self.config['configfile'])
132 return self.host_instance.get_libvirt_vms()
134 def flow_match(self):
135 _queues = int(self.config['queues'])
136 _virtenv = int(self.config['virtenv'])
137 _flows = int(self.config['flows'])
138 return _flows == _queues * _virtenv
140 def match_virt_env(self):
142 self.vms = self._get_vms()
143 return len(self.vms) == int(self.config['virtenv'])
145 print("[ERROR]vms or containers number is equal to virtenv.")
149 def match_flows_and_nic(self):
151 for section in ['send', 'recv']:
152 nic = self._get_nic_from_file(section, 'nic')
154 irq_proc = self.host_instance.get_nic_interrupt_proc(nic)
155 return int(self.config['flows']) == len(irq_proc)
157 print("[ERROR]match flow with nic interrupt failed.")
160 def _get_nic_irq_proc(self, nic):
161 return self.host_instance.get_nic_interrupt_proc(nic)
163 def _get_nic_from_file(self, section, column):
164 return self.handle.get(section, column)
166 def _get_range(self, section, column):
168 info = self.handle.get(section, column)
169 return info.split(' ')
171 print("[ERROR]Get mac failed.")
174 def check_mac_valid(self):
177 for option in ['send', 'recv']:
178 info = self.handle.get(option, 'macs')
180 if len(macs) != int(self.config['virtenv']) or macs == []:
181 print("[ERROR]The macs number is not equal to vms or containers.")
185 if re.match(r'..:..:..:..:..:..', mac):
188 print("[ERROR]mac %s invalid" % mac)
195 print("[ERROR]parse macs failed.")
198 def check_vlan_valid(self):
200 for direct in ['send', 'recv']:
201 vlans = self.handle.get(direct, 'vlans').split()
202 if len(vlans) != int(self.config['virtenv']):
203 print("[ERROR]vlan un config")
206 if int(vlan) <= 1 or int(vlan) >= 4095:
212 def check_logic_invalid(self):
213 return self.flow_match() and self.match_virt_env() and \
214 self.match_flows_and_nic and self.check_mac_valid() and \
215 self.check_vlan_valid()
218 def read_flow_init(self):
221 src_macs = self._get_range('send', 'macs')
222 dst_macs = self._get_range('recv', 'macs')
223 src_vlan = self._get_range('send', 'vlans')
224 dst_vlan = self._get_range('recv', 'vlans')
225 src_nic = self._get_nic_from_file('send', 'nic')
226 dst_nic = self._get_nic_from_file('recv', 'nic')
227 src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic))
228 dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic))
229 src_ip_sections = self._get_range('send', 'ip_sections')
230 dst_ip_sections = self._get_range('recv', 'ip_sections')
231 send_port = self._get_nic_from_file('send', 'port')
232 recv_port = self._get_nic_from_file('recv', 'port')
233 temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip')
235 avg_flow = int(self.config['flows']) / int(self.config['virtenv'])
236 # build the main dictionary
237 for _direct in sorted(fwd[self.config['direct']]):
240 temp_flow['direct'] = _direct
241 temp_flow['send_port'] = send_port
242 temp_flow['recv_port'] = recv_port
244 for _vm in sorted(self.vms):
248 temp_flow['virt'] = _vm
249 _vm_info = self.host_instance.get_vm_info(_vm)
250 temp_flow['qemu_proc'] = _vm_info['main_pid']
251 # temp_flow['qemu_thread'] = _vm_info['qemu_thread']
252 temp_flow['mem_numa'] = _vm_info['mem_numa']
253 # temp_flow['vhost_thread'] = _vm_info['vhost_thread']
255 temp_flow['src_mac'] = src_macs[i]
256 temp_flow['dst_mac'] = dst_macs[i]
257 temp_flow['vlan'] = vlan_id[self.config['vlans']]
258 src_ip = src_ip_sections[i]
259 dst_ip = dst_ip_sections[i]
260 temp_flow['src_ip'] = src_ip
261 temp_flow['dst_ip'] = dst_ip
262 vm_index = sorted(self.vms).index(_vm)
263 for _queue in range(1, int(self.config['queues']) + 1):
265 temp_flow['queue'] = _queue
268 temp_flow['qemu_thread_list'] = _vm_info['qemu_thread']
270 "forward": _vm_info['qemu_thread'][_queue + avg_flow * vm_index],
271 "reverse": _vm_info['qemu_thread'][_queue + avg_flow * vm_index + int(self.config['flows'])]
273 temp_flow['fwd_thread'] = forward_core[_direct]
275 temp_flow['fwd_vhost'] = None
276 # nic interrupts info
277 temp_flow['src_recv_irq'] = src_nic_irq[j]
278 temp_flow['src_nic'] = src_nic
279 temp_flow['dst_send_irq'] = dst_nic_irq[j]
280 temp_flow['dst_nic'] = dst_nic
283 self.init_flows[_direct + '_' + _vm + '_' + str(_queue)] = copy.deepcopy(temp_flow)
285 src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq
287 send_port, recv_port = recv_port, send_port
288 src_nic, dst_nic = dst_nic, src_nic
289 src_macs, dst_macs = dst_macs, src_macs
290 src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections
291 # return sorted(self.init_flows.iteritems(), key=lambda d:d[0])
292 return self.init_flows
294 def mac_learning(self, flowa, flowb):
297 ret = self.send_instace.mac_learning(flowa, flowb)
300 def send_packet(self, flow):
302 # return a stream block handle
303 return self.send_instace.send_packet(flow)
305 def stop_flow(self, streamblock, flow):
307 return self.send_instace.stop_flow(streamblock, flow)
309 def catch_thread_info(self):
310 return self.host_instance.catch_thread_info()
312 def set_thread2flow(self, thread_info, flow):
313 flow['fwd_vhost'] = thread_info['fwd_vhost']
317 def flow_build(self):
318 for _direct in fwd[self.config['direct']]:
320 for _queue in range(1, int(self.config['queues']) + 1):
326 self.mac_learning(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)],
327 self.init_flows[reverse_dict[_direct] + '_' + _vm + '_' + str(_queue)])
328 streamblock = self.send_packet(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
330 result, thread_info = self.catch_thread_info()
331 thread_info = eval(thread_info)
332 self.stop_flow(streamblock, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
335 print("[ERROR]Catch the thread info failed.")
338 print("[ERROR]send flow failed error or get host thread info failed.")
340 # compare the got thread info to
341 if check_dict(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]):
342 self.set_thread2flow(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
343 print("[INFO]Flow %s_%s_%s : fwd_vhost %s src_recv_irq %s dst_send_irq %s"
344 % (_direct, _vm, _queue, thread_info['fwd_vhost'], thread_info['src_recv_irq'],
345 thread_info['dst_send_irq']))
346 print("%s" % (self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]))
349 dst_ip_update(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
350 return self.init_flows
352 def affinity_bind(self, aff_strategy):
353 # get the forward cores
361 # recognize the thread id
362 for flowname in sorted(self.init_flows.keys()):
363 tmp_thread = self.init_flows[flowname]['fwd_thread']
364 qemu_other = qemu_other + copy.deepcopy(self.init_flows[flowname]['qemu_thread_list'])
365 qemu_list.append(tmp_thread)
366 if self.init_flows[flowname]['direct'] == 'forward':
367 dst_vhost.append(self.init_flows[flowname]['fwd_vhost'])
368 src_irq.append(self.init_flows[flowname]['src_recv_irq'])
369 dst_irq.append(self.init_flows[flowname]['dst_send_irq'])
370 elif self.init_flows[flowname]['direct'] == 'reverse':
371 src_vhost.append(self.init_flows[flowname]['fwd_vhost'])
372 dst_irq.append(self.init_flows[flowname]['src_recv_irq'])
373 src_irq.append(self.init_flows[flowname]['dst_send_irq'])
375 qemu_list = sorted({}.fromkeys(qemu_list).keys())
376 src_vhost = sorted({}.fromkeys(src_vhost).keys())
377 dst_vhost = sorted({}.fromkeys(dst_vhost).keys())
378 src_irq = sorted({}.fromkeys(src_irq).keys())
379 dst_irq = sorted({}.fromkeys(dst_irq).keys())
381 # get the qemu thread except the forward core
382 qemu_other = sorted({}.fromkeys(qemu_other).keys())
385 # get the bind strategy
386 handle = ConfigParser.ConfigParser()
387 handle.read(self.config['strategyfile'])
389 qemu_numa = handle.get('strategy' + self.config['strategy'], 'qemu_numa')
390 src_vhost_numa = handle.get('strategy' + self.config['strategy'], 'src_vhost_numa')
391 dst_vhost_numa = handle.get('strategy' + self.config['strategy'], 'dst_vhost_numa')
392 src_irq_numa = handle.get('strategy' + self.config['strategy'], 'src_irq_numa')
393 dst_irq_numa = handle.get('strategy' + self.config['strategy'], 'dst_irq_numa')
394 loan_numa = handle.get('strategy' + self.config['strategy'], 'loan_numa')
396 print("[ERROR]Parse the strategy file failed or get the options failed.")
398 for value in [qemu_numa, src_vhost_numa, dst_vhost_numa, src_irq_numa, dst_irq_numa, loan_numa]:
399 if value is not None or value == '':
400 raise ValueError('some option in the strategy file is none.')
401 # cores mapping thread
402 numa_topo = self.host_instance.get_numa_core()
403 numa_topo = eval(numa_topo)
404 # first check the cores number
406 # order src_irq dst_irq src_vhost dst_vhost qemu_list
407 for node in numa_topo.keys():
408 numa_topo[node]['process'] = []
409 if 'node' + src_irq_numa == node:
410 numa_topo[node]['process'] = numa_topo[node]['process'] + src_irq
411 if 'node' + dst_irq_numa == node:
412 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_irq
413 if 'node' + src_vhost_numa == node:
414 numa_topo[node]['process'] = numa_topo[node]['process'] + src_vhost
415 if 'node' + dst_vhost_numa == node:
416 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_vhost
417 if 'node' + qemu_numa == node:
418 numa_topo[node]['process'] = numa_topo[node]['process'] + qemu_list
420 for node in numa_topo.keys():
421 if len(numa_topo[node]['process']) > len(numa_topo[node]['phy_cores']):
423 diff = len(numa_topo[node]['process']) - len(numa_topo[node]['phy_cores'])
425 numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa]['process'] + copy.deepcopy(
426 numa_topo[node]['process'][-diff:])
427 cores_str = _tranfer_array_to_range(numa_topo['node' + loan_numa]['phy_cores'][diff:])
428 loan_cores = ','.join([loan_cores, cores_str])
429 numa_topo[node]['process'] = numa_topo[node]['process'][0:-diff]
430 loan_cores = loan_cores[1:]
432 for proc_loan in qemu_other:
433 loan_bind_list[proc_loan] = loan_cores
436 for node in numa_topo.keys():
437 for i in range(len(numa_topo[node]['process'])):
438 bind_list[numa_topo[node]['process'][i]] = str(numa_topo[node]['phy_cores'][i])
439 bind_list.update(loan_bind_list)
440 for key in bind_list.keys():
441 self.host_instance.bind_cpu(bind_list[key], key)
445 def testrun(self, suite):
446 global forward_init_flows, reverse_init_flows
448 forward_init_flows = {}
449 reverse_init_flows = {}
450 for key in self.init_flows.keys():
451 if self.init_flows[key]['direct'] == "forward":
452 forward_init_flows[key] = self.init_flows[key]
453 elif self.init_flows[key]['direct'] == "reverse":
454 reverse_init_flows[key] = self.init_flows[key]
455 forward_init_flows = str(forward_init_flows)
456 reverse_init_flows = str(reverse_init_flows)
458 print("[ERROR]init the forward and reverse flow failed.")
460 if suite == "throughput":
461 print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test")
462 ret, result = self.send_instace.run_rfc2544_throughput(forward_init_flows, reverse_init_flows)
463 elif suite == "frameloss":
464 print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test")
465 ret, result = self.send_instace.run_rfc2544_frameloss(forward_init_flows, reverse_init_flows)