1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from spirent import stcPython
14 class Spirent_Tools(object):
17 """This class provide API of Spirent
20 super(Spirent_Tools, self).__init__()
22 def send_packet(self,flow):
28 self.baseAPI.stc_perform(' ResetConfig -config system1')
29 self.baseAPI.stc_init()
31 project = self.baseAPI.stc_create_project()
33 port_handle = self.baseAPI.stc_create_port(project)
35 slot = flow['send_port'].split('/')[0]
36 port = flow['send_port'].split('/')[1]
37 self.baseAPI.stc_config_port_location(port_handle,flow['tester_ip'],slot,port)
39 streamblock_handle = self.baseAPI.stc_create_streamblock(
40 port_name = port_handle,
42 vlan_tag = flow['vlan'],
43 srcMac = flow['src_mac'],
44 dstMac = flow['dst_mac'],
45 sourceAddr = flow['src_ip'],
46 destAddr =flow['dst_ip']
49 port_list = [port_handle]
50 self.baseAPI.stc_attach_ports(port_list)
52 streamblock_list = [streamblock_handle]
53 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
54 return str(streamblock_list).strip('[]')
56 print("[ERROR]create stream block and send packet failed.")
59 def mac_learning(self,flowA,flowB):
68 self.baseAPI.stc_perform(' ResetConfig -config system1')
69 self.baseAPI.stc_init()
71 project = self.baseAPI.stc_create_project()
72 #create port and config port
73 for flow in [ flowA,flowB ]:
74 flow['port_handle'] = self.baseAPI.stc_create_port(project)
75 tmp_test_ip = flow['tester_ip']
76 tmp_slot = flow['send_port'].split('/')[0]
77 tmp_port = flow['send_port'].split('/')[1]
78 self.baseAPI.stc_config_port_location(flow['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
80 flow['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = flow['port_handle'],
82 vlan_tag = flow['vlan'],
83 srcMac = flow['src_mac'],
84 dstMac = flow['dst_mac'],
85 sourceAddr = flow['src_ip'],
86 destAddr =flow['dst_ip'])
87 #create port and stream block list
88 port_list.append(flow['port_handle'])
89 streamblock_list.append(flow['streamblock'])
92 self.baseAPI.stc_attach_ports(port_list)
94 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
98 self.baseAPI.stc_streamblock_stop(streamblock_list)
99 # delete streamblock and release port
100 for flow in [ flowA,flowB ]:
101 tmp_test_ip = flow['tester_ip']
102 tmp_slot = flow['send_port'].split('/')[0]
103 tmp_port = flow['send_port'].split('/')[1]
104 self.baseAPI.stc_delete(flow['streamblock'])
105 self.baseAPI.stc_release('%s/%s/%s' %(tmp_test_ip,tmp_slot,tmp_port))
107 self.baseAPI.stc_delete('project1')
108 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
111 print("[ERROR]mac learning failed")
114 def stop_flow(self,streamblock_list,flow):
116 streamblock_list = streamblock_list.strip('\'').split(',')
117 #stop streamblock list
119 ret = self.baseAPI.stc_streamblock_stop(streamblock_list)
121 print("[ERROR]Stop the streamblock list failed.")
125 for streamblock in streamblock_list :
126 ret = self.baseAPI.stc_delete(streamblock)
128 print("[ERROR]delete stream block.")
132 slot = flow['send_port'].split('/')[0]
133 port = flow['send_port'].split('/')[1]
134 ret = self.baseAPI.stc_release('%s/%s/%s' %(flow['tester_ip'],slot,port))
136 print("[ERROR]Release port failed")
140 ret = self.baseAPI.stc_delete('project1')
141 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
144 print("[ERROR]Delete project1 failed.")
147 def run_rfc2544_throughput(self,forward_init_flows,reverse_init_flows):
151 forward_init_flows = eval(forward_init_flows)
152 reverse_init_flows = eval(reverse_init_flows)
154 self.baseAPI.stc_perform(' ResetConfig -config system1')
155 self.baseAPI.stc_init()
157 project = self.baseAPI.stc_create_project()
159 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
161 forward_port_handle = self.baseAPI.stc_create_port(project)
162 reverse_port_handle = self.baseAPI.stc_create_port(project)
163 #create forward flow streamblock
164 for key in forward_init_flows.keys():
165 forward_init_flows[key]['port_handle'] = forward_port_handle
166 tmp_test_ip = forward_init_flows[key]['tester_ip']
167 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
168 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
169 self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
171 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
172 vlan_tag = forward_init_flows[key]['vlan'],
173 ExpectedRxPort = reverse_port_handle,
174 srcMac = forward_init_flows[key]['src_mac'],
175 dstMac = forward_init_flows[key]['dst_mac'],
176 sourceAddr = forward_init_flows[key]['src_ip'],
177 destAddr = forward_init_flows[key]['dst_ip'])
178 #create reverse flow streamblock
179 for key in reverse_init_flows.keys():
180 reverse_init_flows[key]['port_handle'] = reverse_port_handle
181 tmp_test_ip = reverse_init_flows[key]['tester_ip']
182 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
183 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
184 self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
186 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
187 vlan_tag = reverse_init_flows[key]['vlan'],
188 ExpectedRxPort = forward_port_handle,
189 srcMac = reverse_init_flows[key]['src_mac'],
190 dstMac = reverse_init_flows[key]['dst_mac'],
191 sourceAddr = reverse_init_flows[key]['src_ip'],
192 destAddr = reverse_init_flows[key]['dst_ip'])
193 #Create the RFC 2544 throughput test
194 throughput_config = self.baseAPI.stc_create('Rfc2544ThroughputConfig -under ',project,
195 '-AcceptableFrameLoss 0.01',
197 '-DurationSeconds 60',
198 '-SearchMode BINARY',
200 '-RateUpperLimit 100',
202 '-UseExistingStreamBlocks True',
203 '-EnableLearning False',
204 '-FrameSizeIterationMode CUSTOM',
205 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
207 '-EnableJitterMeasurement TRUE'
212 streamblock_list = '" '
213 for key in forward_init_flows.keys():
214 streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
215 for key in reverse_init_flows.keys():
216 streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
217 streamblock_list = streamblock_list+'"'
219 throughput_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+throughput_config+' -Active TRUE -LocalActive TRUE')
220 self.baseAPI.stc_config(throughput_sbProfile,'-StreamBlockList '+streamblock_list)
221 self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',throughput_config)
223 #attach the port before testing
224 port_list = [ forward_port_handle,reverse_port_handle]
225 self.baseAPI.stc_attach_ports(port_list)
227 #stc apply and begin to sequence test
228 self.baseAPI.stc_apply()
229 self.baseAPI.stc_perform("SequencerStart")
232 self.baseAPI.stc_waituntilcomplete()
235 resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
236 results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544ThroughputTestResultDetailedSummaryView')
238 return True,results_dict
240 def run_rfc2544_frameloss(self,forward_init_flows,reverse_init_flows):
244 forward_init_flows = eval(forward_init_flows)
245 reverse_init_flows = eval(reverse_init_flows)
247 self.baseAPI.stc_perform(' ResetConfig -config system1')
248 self.baseAPI.stc_init()
250 project = self.baseAPI.stc_create_project()
252 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
254 forward_port_handle = self.baseAPI.stc_create_port(project)
255 reverse_port_handle = self.baseAPI.stc_create_port(project)
256 #create forward flow streamblock
257 for key in forward_init_flows.keys():
258 forward_init_flows[key]['port_handle'] = forward_port_handle
259 tmp_test_ip = forward_init_flows[key]['tester_ip']
260 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
261 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
262 self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
264 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
265 vlan_tag = forward_init_flows[key]['vlan'],
266 ExpectedRxPort = reverse_port_handle,
267 srcMac = forward_init_flows[key]['src_mac'],
268 dstMac = forward_init_flows[key]['dst_mac'],
269 sourceAddr = forward_init_flows[key]['src_ip'],
270 destAddr = forward_init_flows[key]['dst_ip'])
271 #create reverse flow streamblock
272 for key in reverse_init_flows.keys():
273 reverse_init_flows[key]['port_handle'] = reverse_port_handle
274 tmp_test_ip = reverse_init_flows[key]['tester_ip']
275 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
276 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
277 self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
279 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
280 vlan_tag = reverse_init_flows[key]['vlan'],
281 ExpectedRxPort = forward_port_handle,
282 srcMac = reverse_init_flows[key]['src_mac'],
283 dstMac = reverse_init_flows[key]['dst_mac'],
284 sourceAddr = reverse_init_flows[key]['src_ip'],
285 destAddr = reverse_init_flows[key]['dst_ip'])
286 #Create the RFC 2544 frameloss test
287 frameloss_config = self.baseAPI.stc_create('Rfc2544FrameLossConfig -under ',project,
289 '-DurationSeconds 60 ',
290 '-LoadUnits PERCENT_LINE_RATE ',
292 '-CustomLoadList 100 '
293 '-UseExistingStreamBlocks True ',
294 '-EnableLearning False ',
295 '-FrameSizeIterationMode CUSTOM ',
296 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
298 '-EnableJitterMeasurement TRUE'
303 streamblock_list = '" '
304 for key in forward_init_flows.keys():
305 streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
306 for key in reverse_init_flows.keys():
307 streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
308 streamblock_list = streamblock_list+'"'
310 frameloss_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+frameloss_config+' -Active TRUE -LocalActive TRUE')
311 self.baseAPI.stc_config(frameloss_sbProfile,'-StreamBlockList '+streamblock_list)
312 self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',frameloss_config)
314 #attach the port before testing
315 port_list = [ forward_port_handle,reverse_port_handle]
316 self.baseAPI.stc_attach_ports(port_list)
318 #stc apply and begin to sequence test
319 self.baseAPI.stc_apply()
320 self.baseAPI.stc_perform("SequencerStart")
323 self.baseAPI.stc_waituntilcomplete()
326 resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
327 results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544FrameLossTestResultDetailedSummaryView')
330 return True,results_dict
332 def run_rfc2544_latency(self,forward_init_flows,reverse_init_flows):