11 fwd = {'single': ['forward'],
12 'double': ['forward', 'reverse']
15 direction = ['single', 'double']
22 class BaseModel(object):
23 def __init__(self, config):
26 def _check_model(self):
27 return self.config['model'] in models
29 def _check_virtenv(self):
31 num = int(self.config['virtenv'])
32 return num in range(1, 9)
34 print("[ERROR]The virtenv is not a inter number.")
36 def _check_queues(self):
38 num = int(self.config['queues'])
39 return num in range(1, 9)
41 print("[ERROR]The virt queues is not a inter number.")
44 def _check_flows(self):
46 num = int(self.config['flows'])
47 return num in range(1, 9)
49 print("[ERROR]The flow is not a inter number.")
51 def _check_direct(self):
52 return self.config['direct'] in direction
54 def _check_vlans(self):
55 return self.config['vlans'] in ['True', 'False']
57 def _check_bind(self):
60 def check_parameter_invalid(self):
62 if self._check_model() and \
63 self._check_virtenv() and \
64 self._check_queues() and \
65 self._check_flows and \
66 self._check_direct() and \
67 self._check_vlans() and \
71 print("[ERROR]Paramter check invalid")
74 print("[ERROR]Check parameter invalid with unknown reason.")
78 def _get_array_values(irq_array):
80 for i in range(len(irq_array)):
81 proc_list.append(irq_array[i][1])
82 return sorted(proc_list)
85 def check_dict(thread_info, flow):
86 if thread_info['src_recv_irq'] != flow['src_recv_irq']:
87 print("[WARN]Flow src_irq process %s not match %s in the table."
88 % (thread_info['src_recv_irq'],
89 flow['src_recv_irq']))
91 if thread_info['dst_send_irq'] != flow['dst_send_irq']:
92 print("[WARN]Flow dst_irq process %s not match %s in the table."
93 % (thread_info['dst_send_irq'],
94 flow['dst_send_irq']))
99 def dst_ip_update(flow):
101 src_dst_ip = flow['dst_ip']
102 ip_section = '.'.join(src_dst_ip.split('.')[0:3]) + '.'
103 number = int(src_dst_ip.split('.')[3])
104 new_number = number + 1
105 new_dst_ip = ip_section + str(new_number)
106 flow['dst_ip'] = new_dst_ip
108 print("[ERROR]dst ip update failed.")
111 def _tranfer_array_to_range(array):
112 return str(array[0]) + '-' + str(array[-1])
115 class TnV(BaseModel):
116 def __init__(self, config):
117 super(TnV, self).__init__(config)
119 self.host_instance = None
120 self.send_instace = None
123 handle = ConfigParser.ConfigParser()
124 handle.read(self.config['configfile'])
128 return self.host_instance.get_libvirt_vms()
130 def flow_match(self):
131 _queues = int(self.config['queues'])
132 _virtenv = int(self.config['virtenv'])
133 _flows = int(self.config['flows'])
134 return _flows == _queues * _virtenv
136 def match_virt_env(self):
138 self.vms = self._get_vms()
139 return len(self.vms) == int(self.config['virtenv'])
141 print("[ERROR]vms or containers number is equal to virtenv.")
145 def match_flows_and_nic(self):
147 for section in ['send', 'recv']:
148 nic = self._get_nic_from_file(section, 'nic')
150 irq_proc = self.host_instance.get_nic_interrupt_proc(nic)
151 return int(self.config['flows']) == len(irq_proc)
153 print("[ERROR]match flow with nic interrupt failed.")
156 def _get_nic_irq_proc(self, nic):
157 return self.host_instance.get_nic_interrupt_proc(nic)
159 def _get_nic_from_file(self, section, column):
160 return self.handle.get(section, column)
162 def _get_range(self, section, column):
164 info = self.handle.get(section, column)
165 return info.split(' ')
167 print("[ERROR]Get mac failed.")
170 def check_mac_valid(self):
173 for option in ['send', 'recv']:
174 info = self.handle.get(option, 'macs')
176 if len(macs) != int(self.config['virtenv']) or macs == []:
177 print("[ERROR]The macs number is not equal to vms or containers.")
181 if re.match(r'..:..:..:..:..:..', mac):
184 print("[ERROR]mac %s invalid" % mac)
191 print("[ERROR]parse macs failed.")
194 def check_vlan_valid(self):
196 for direct in ['send', 'recv']:
197 vlans = self.handle.get(direct, 'vlans').split()
198 if len(vlans) != int(self.config['virtenv']):
199 print("[ERROR]vlan un config")
202 if int(vlan) <= 1 or int(vlan) >= 4095:
208 def check_logic_invalid(self):
209 return self.flow_match() and self.match_virt_env() and \
210 self.match_flows_and_nic and self.check_mac_valid() and \
211 self.check_vlan_valid()
214 def read_flow_init(self):
217 src_macs = self._get_range('send', 'macs')
218 dst_macs = self._get_range('recv', 'macs')
219 src_vlan = self._get_range('send', 'vlans')
220 dst_vlan = self._get_range('recv', 'vlans')
221 src_nic = self._get_nic_from_file('send', 'nic')
222 dst_nic = self._get_nic_from_file('recv', 'nic')
223 src_nic_irq = _get_array_values(self._get_nic_irq_proc(src_nic))
224 dst_nic_irq = _get_array_values(self._get_nic_irq_proc(dst_nic))
225 src_ip_sections = self._get_range('send', 'ip_sections')
226 dst_ip_sections = self._get_range('recv', 'ip_sections')
227 send_port = self._get_nic_from_file('send', 'port')
228 recv_port = self._get_nic_from_file('recv', 'port')
229 temp_flow['tester_ip'] = self._get_nic_from_file('common', 'tester_ip')
231 avg_flow = int(self.config['flows']) / int(self.config['virtenv'])
232 # build the main dictionary
233 for _direct in sorted(fwd[self.config['direct']]):
236 temp_flow['direct'] = _direct
237 temp_flow['send_port'] = send_port
238 temp_flow['recv_port'] = recv_port
240 for _vm in sorted(self.vms):
244 temp_flow['virt'] = _vm
245 _vm_info = self.host_instance.get_vm_info(_vm)
246 temp_flow['qemu_proc'] = _vm_info['main_pid']
247 # temp_flow['qemu_thread'] = _vm_info['qemu_thread']
248 temp_flow['mem_numa'] = _vm_info['mem_numa']
249 # temp_flow['vhost_thread'] = _vm_info['vhost_thread']
251 temp_flow['src_mac'] = src_macs[i]
252 temp_flow['dst_mac'] = dst_macs[i]
253 temp_flow['vlan'] = vlan_id[self.config['vlans']]
254 src_ip = src_ip_sections[i]
255 dst_ip = dst_ip_sections[i]
256 temp_flow['src_ip'] = src_ip
257 temp_flow['dst_ip'] = dst_ip
258 vm_index = sorted(self.vms).index(_vm)
259 for _queue in range(1, int(self.config['queues']) + 1):
261 temp_flow['queue'] = _queue
264 temp_flow['qemu_thread_list'] = _vm_info['qemu_thread']
266 "forward": _vm_info['qemu_thread'][_queue + avg_flow * vm_index],
267 "reverse": _vm_info['qemu_thread'][_queue + avg_flow * vm_index + int(self.config['flows'])]
269 temp_flow['fwd_thread'] = forward_core[_direct]
271 temp_flow['fwd_vhost'] = None
272 # nic interrupts info
273 temp_flow['src_recv_irq'] = src_nic_irq[j]
274 temp_flow['src_nic'] = src_nic
275 temp_flow['dst_send_irq'] = dst_nic_irq[j]
276 temp_flow['dst_nic'] = dst_nic
279 self.init_flows[_direct + '_' + _vm + '_' + str(_queue)] = copy.deepcopy(temp_flow)
281 src_nic_irq, dst_nic_irq = dst_nic_irq, src_nic_irq
283 send_port, recv_port = recv_port, send_port
284 src_nic, dst_nic = dst_nic, src_nic
285 src_macs, dst_macs = dst_macs, src_macs
286 src_ip_sections, dst_ip_sections = dst_ip_sections, src_ip_sections
287 # return sorted(self.init_flows.iteritems(), key=lambda d:d[0])
288 return self.init_flows
290 def mac_learning(self, flowa, flowb):
293 ret = self.send_instace.mac_learning(flowa, flowb)
296 def send_packet(self, flow):
298 # return a stream block handle
299 return self.send_instace.send_packet(flow)
301 def stop_flow(self, streamblock, flow):
303 return self.send_instace.stop_flow(streamblock, flow)
305 def catch_thread_info(self):
306 return self.host_instance.catch_thread_info()
308 def set_thread2flow(self, thread_info, flow):
309 flow['fwd_vhost'] = thread_info['fwd_vhost']
313 def flow_build(self):
314 for _direct in fwd[self.config['direct']]:
316 for _queue in range(1, int(self.config['queues']) + 1):
322 self.mac_learning(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)],
323 self.init_flows[reverse_dict[_direct] + '_' + _vm + '_' + str(_queue)])
324 streamblock = self.send_packet(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
326 result, thread_info = self.catch_thread_info()
327 thread_info = eval(thread_info)
328 self.stop_flow(streamblock, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
331 print("[ERROR]Catch the thread info failed.")
334 print("[ERROR]send flow failed error or get host thread info failed.")
336 # compare the got thread info to
337 if check_dict(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]):
338 self.set_thread2flow(thread_info, self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
339 print("[INFO]Flow %s_%s_%s : fwd_vhost %s src_recv_irq %s dst_send_irq %s"
340 % (_direct, _vm, _queue, thread_info['fwd_vhost'], thread_info['src_recv_irq'],
341 thread_info['dst_send_irq']))
342 print("%s" % (self.init_flows[_direct + '_' + _vm + '_' + str(_queue)]))
345 dst_ip_update(self.init_flows[_direct + '_' + _vm + '_' + str(_queue)])
346 return self.init_flows
348 def affinity_bind(self, aff_strategy):
349 # get the forward cores
357 # recognize the thread id
358 for flowname in sorted(self.init_flows.keys()):
359 tmp_thread = self.init_flows[flowname]['fwd_thread']
360 qemu_other = qemu_other + copy.deepcopy(self.init_flows[flowname]['qemu_thread_list'])
361 qemu_list.append(tmp_thread)
362 if self.init_flows[flowname]['direct'] == 'forward':
363 dst_vhost.append(self.init_flows[flowname]['fwd_vhost'])
364 src_irq.append(self.init_flows[flowname]['src_recv_irq'])
365 dst_irq.append(self.init_flows[flowname]['dst_send_irq'])
366 elif self.init_flows[flowname]['direct'] == 'reverse':
367 src_vhost.append(self.init_flows[flowname]['fwd_vhost'])
368 dst_irq.append(self.init_flows[flowname]['src_recv_irq'])
369 src_irq.append(self.init_flows[flowname]['dst_send_irq'])
371 qemu_list = sorted({}.fromkeys(qemu_list).keys())
372 src_vhost = sorted({}.fromkeys(src_vhost).keys())
373 dst_vhost = sorted({}.fromkeys(dst_vhost).keys())
374 src_irq = sorted({}.fromkeys(src_irq).keys())
375 dst_irq = sorted({}.fromkeys(dst_irq).keys())
377 # get the qemu thread except the forward core
378 qemu_other = sorted({}.fromkeys(qemu_other).keys())
381 # get the bind strategy
382 handle = ConfigParser.ConfigParser()
383 handle.read(self.config['strategyfile'])
385 qemu_numa = handle.get('strategy' + self.config['strategy'], 'qemu_numa')
386 src_vhost_numa = handle.get('strategy' + self.config['strategy'], 'src_vhost_numa')
387 dst_vhost_numa = handle.get('strategy' + self.config['strategy'], 'dst_vhost_numa')
388 src_irq_numa = handle.get('strategy' + self.config['strategy'], 'src_irq_numa')
389 dst_irq_numa = handle.get('strategy' + self.config['strategy'], 'dst_irq_numa')
390 loan_numa = handle.get('strategy' + self.config['strategy'], 'loan_numa')
392 print("[ERROR]Parse the strategy file failed or get the options failed.")
394 for value in [qemu_numa, src_vhost_numa, dst_vhost_numa, src_irq_numa, dst_irq_numa, loan_numa]:
395 if value is not None or value == '':
396 raise ValueError('some option in the strategy file is none.')
397 # cores mapping thread
398 numa_topo = self.host_instance.get_numa_core()
399 numa_topo = eval(numa_topo)
400 # first check the cores number
402 # order src_irq dst_irq src_vhost dst_vhost qemu_list
403 for node in numa_topo.keys():
404 numa_topo[node]['process'] = []
405 if 'node' + src_irq_numa == node:
406 numa_topo[node]['process'] = numa_topo[node]['process'] + src_irq
407 if 'node' + dst_irq_numa == node:
408 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_irq
409 if 'node' + src_vhost_numa == node:
410 numa_topo[node]['process'] = numa_topo[node]['process'] + src_vhost
411 if 'node' + dst_vhost_numa == node:
412 numa_topo[node]['process'] = numa_topo[node]['process'] + dst_vhost
413 if 'node' + qemu_numa == node:
414 numa_topo[node]['process'] = numa_topo[node]['process'] + qemu_list
416 for node in numa_topo.keys():
417 if len(numa_topo[node]['process']) > len(numa_topo[node]['phy_cores']):
419 diff = len(numa_topo[node]['process']) - len(numa_topo[node]['phy_cores'])
421 numa_topo['node' + loan_numa]['process'] = numa_topo['node' + loan_numa]['process'] + copy.deepcopy(
422 numa_topo[node]['process'][-diff:])
423 cores_str = _tranfer_array_to_range(numa_topo['node' + loan_numa]['phy_cores'][diff:])
424 loan_cores = ','.join([loan_cores, cores_str])
425 numa_topo[node]['process'] = numa_topo[node]['process'][0:-diff]
426 loan_cores = loan_cores[1:]
428 for proc_loan in qemu_other:
429 loan_bind_list[proc_loan] = loan_cores
432 for node in numa_topo.keys():
433 for i in range(len(numa_topo[node]['process'])):
434 bind_list[numa_topo[node]['process'][i]] = str(numa_topo[node]['phy_cores'][i])
435 bind_list.update(loan_bind_list)
436 for key in bind_list.keys():
437 self.host_instance.bind_cpu(bind_list[key], key)
441 def testrun(self, suite):
442 global forward_init_flows, reverse_init_flows
444 forward_init_flows = {}
445 reverse_init_flows = {}
446 for key in self.init_flows.keys():
447 if self.init_flows[key]['direct'] == "forward":
448 forward_init_flows[key] = self.init_flows[key]
449 elif self.init_flows[key]['direct'] == "reverse":
450 reverse_init_flows[key] = self.init_flows[key]
451 forward_init_flows = str(forward_init_flows)
452 reverse_init_flows = str(reverse_init_flows)
454 print("[ERROR]init the forward and reverse flow failed.")
456 if suite == "throughput":
457 print("[INFO]!!!!!!!!!!!!!!!Now begin to throughput test")
458 ret, result = self.send_instace.run_rfc2544_throughput(forward_init_flows, reverse_init_flows)
459 elif suite == "frameloss":
460 print("[INFO]!!!!!!!!!!!1!!!Now begin to frameloss test")
461 ret, result = self.send_instace.run_rfc2544_frameloss(forward_init_flows, reverse_init_flows)