1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
12 from spirent import stcPython
15 class Spirent_Tools(object):
19 """This class provide API of Spirent
22 super(Spirent_Tools, self).__init__()
24 def send_packet(self, flow):
30 self.baseAPI.stc_perform(' ResetConfig -config system1')
31 self.baseAPI.stc_init()
33 project = self.baseAPI.stc_create_project()
35 port_handle = self.baseAPI.stc_create_port(project)
37 slot = flow['send_port'].split('/')[0]
38 port = flow['send_port'].split('/')[1]
39 self.baseAPI.stc_config_port_location(
40 port_handle, flow['tester_ip'], slot, port)
42 streamblock_handle = self.baseAPI.stc_create_streamblock(
43 port_name=port_handle,
45 vlan_tag=flow['vlan'],
46 srcMac=flow['src_mac'],
47 dstMac=flow['dst_mac'],
48 sourceAddr=flow['src_ip'],
49 destAddr=flow['dst_ip']
52 port_list = [port_handle]
53 self.baseAPI.stc_attach_ports(port_list)
55 streamblock_list = [streamblock_handle]
56 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
57 return str(streamblock_list).strip('[]')
59 print("[ERROR]create stream block and send packet failed.")
62 def mac_learning(self, flowA, flowB):
71 self.baseAPI.stc_perform(' ResetConfig -config system1')
72 self.baseAPI.stc_init()
74 project = self.baseAPI.stc_create_project()
75 # create port and config port
76 for flow in [flowA, flowB]:
77 flow['port_handle'] = self.baseAPI.stc_create_port(project)
78 tmp_test_ip = flow['tester_ip']
79 tmp_slot = flow['send_port'].split('/')[0]
80 tmp_port = flow['send_port'].split('/')[1]
81 self.baseAPI.stc_config_port_location(
82 flow['port_handle'], tmp_test_ip, tmp_slot, tmp_port)
84 flow['streamblock'] = self.baseAPI.stc_create_streamblock(
85 port_name=flow['port_handle'],
87 vlan_tag=flow['vlan'],
88 srcMac=flow['src_mac'],
89 dstMac=flow['dst_mac'],
90 sourceAddr=flow['src_ip'],
91 destAddr=flow['dst_ip'])
92 # create port and stream block list
93 port_list.append(flow['port_handle'])
94 streamblock_list.append(flow['streamblock'])
97 self.baseAPI.stc_attach_ports(port_list)
99 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
103 self.baseAPI.stc_streamblock_stop(streamblock_list)
104 # delete streamblock and release port
105 for flow in [flowA, flowB]:
106 tmp_test_ip = flow['tester_ip']
107 tmp_slot = flow['send_port'].split('/')[0]
108 tmp_port = flow['send_port'].split('/')[1]
109 self.baseAPI.stc_delete(flow['streamblock'])
110 self.baseAPI.stc_release(
112 (tmp_test_ip, tmp_slot, tmp_port))
114 self.baseAPI.stc_delete('project1')
115 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
118 print("[ERROR]mac learning failed")
121 def stop_flow(self, streamblock_list, flow):
123 streamblock_list = streamblock_list.strip('\'').split(',')
124 # stop streamblock list
126 ret = self.baseAPI.stc_streamblock_stop(streamblock_list)
128 print("[ERROR]Stop the streamblock list failed.")
132 for streamblock in streamblock_list:
133 ret = self.baseAPI.stc_delete(streamblock)
135 print("[ERROR]delete stream block.")
139 slot = flow['send_port'].split('/')[0]
140 port = flow['send_port'].split('/')[1]
141 ret = self.baseAPI.stc_release(
143 (flow['tester_ip'], slot, port))
145 print("[ERROR]Release port failed")
149 ret = self.baseAPI.stc_delete('project1')
150 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
153 print("[ERROR]Delete project1 failed.")
156 def run_rfc2544_throughput(self, forward_init_flows, reverse_init_flows):
160 forward_init_flows = eval(forward_init_flows)
161 reverse_init_flows = eval(reverse_init_flows)
163 self.baseAPI.stc_perform(' ResetConfig -config system1')
164 self.baseAPI.stc_init()
166 project = self.baseAPI.stc_create_project()
168 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' % (project))
170 forward_port_handle = self.baseAPI.stc_create_port(project)
171 reverse_port_handle = self.baseAPI.stc_create_port(project)
172 # create forward flow streamblock
173 for key in forward_init_flows.keys():
174 forward_init_flows[key]['port_handle'] = forward_port_handle
175 tmp_test_ip = forward_init_flows[key]['tester_ip']
176 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
177 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
178 self.baseAPI.stc_config_port_location(
179 forward_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port)
181 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(
182 port_name=forward_init_flows[key]['port_handle'],
183 vlan_tag=forward_init_flows[key]['vlan'],
184 ExpectedRxPort=reverse_port_handle,
185 srcMac=forward_init_flows[key]['src_mac'],
186 dstMac=forward_init_flows[key]['dst_mac'],
187 sourceAddr=forward_init_flows[key]['src_ip'],
188 destAddr=forward_init_flows[key]['dst_ip'])
189 # create reverse flow streamblock
190 for key in reverse_init_flows.keys():
191 reverse_init_flows[key]['port_handle'] = reverse_port_handle
192 tmp_test_ip = reverse_init_flows[key]['tester_ip']
193 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
194 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
195 self.baseAPI.stc_config_port_location(
196 reverse_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port)
198 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(
199 port_name=reverse_init_flows[key]['port_handle'],
200 vlan_tag=reverse_init_flows[key]['vlan'],
201 ExpectedRxPort=forward_port_handle,
202 srcMac=reverse_init_flows[key]['src_mac'],
203 dstMac=reverse_init_flows[key]['dst_mac'],
204 sourceAddr=reverse_init_flows[key]['src_ip'],
205 destAddr=reverse_init_flows[key]['dst_ip'])
206 # Create the RFC 2544 throughput test
207 throughput_config = self.baseAPI.stc_create(
208 'Rfc2544ThroughputConfig -under ',
210 '-AcceptableFrameLoss 0.01',
212 '-DurationSeconds 60',
213 '-SearchMode BINARY',
215 '-RateUpperLimit 100',
217 '-UseExistingStreamBlocks True',
218 '-EnableLearning False',
219 '-FrameSizeIterationMode CUSTOM',
220 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
222 '-EnableJitterMeasurement TRUE')
226 streamblock_list = '" '
227 for key in forward_init_flows.keys():
228 streamblock_list = streamblock_list + \
229 forward_init_flows[key]['streamblock'] + ' '
230 for key in reverse_init_flows.keys():
231 streamblock_list = streamblock_list + \
232 reverse_init_flows[key]['streamblock'] + ' '
233 streamblock_list = streamblock_list + '"'
235 throughput_sbProfile = self.baseAPI.stc_create(
236 'Rfc2544StreamBlockProfile -under ' +
238 ' -Active TRUE -LocalActive TRUE')
239 self.baseAPI.stc_config(
240 throughput_sbProfile,
241 '-StreamBlockList ' +
243 self.baseAPI.stc_perform(
244 'ExpandBenchmarkConfigCommand',
248 # attach the port before testing
249 port_list = [forward_port_handle, reverse_port_handle]
250 self.baseAPI.stc_attach_ports(port_list)
252 # stc apply and begin to sequence test
253 self.baseAPI.stc_apply()
254 self.baseAPI.stc_perform("SequencerStart")
256 # wait until complete
257 self.baseAPI.stc_waituntilcomplete()
260 resultsdb = self.baseAPI.stc_get(
261 "system1.project.TestResultSetting",
262 "-CurrentResultFileName")
263 results_dict = self.baseAPI.stc_perform(
265 '-DatabaseConnectionString',
267 '-ResultPath RFC2544ThroughputTestResultDetailedSummaryView')
269 return True, results_dict
271 def run_rfc2544_frameloss(self, forward_init_flows, reverse_init_flows):
275 forward_init_flows = eval(forward_init_flows)
276 reverse_init_flows = eval(reverse_init_flows)
278 self.baseAPI.stc_perform(' ResetConfig -config system1')
279 self.baseAPI.stc_init()
281 project = self.baseAPI.stc_create_project()
283 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' % (project))
285 forward_port_handle = self.baseAPI.stc_create_port(project)
286 reverse_port_handle = self.baseAPI.stc_create_port(project)
287 # create forward flow streamblock
288 for key in forward_init_flows.keys():
289 forward_init_flows[key]['port_handle'] = forward_port_handle
290 tmp_test_ip = forward_init_flows[key]['tester_ip']
291 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
292 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
293 self.baseAPI.stc_config_port_location(
294 forward_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port)
296 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(
297 port_name=forward_init_flows[key]['port_handle'],
298 vlan_tag=forward_init_flows[key]['vlan'],
299 ExpectedRxPort=reverse_port_handle,
300 srcMac=forward_init_flows[key]['src_mac'],
301 dstMac=forward_init_flows[key]['dst_mac'],
302 sourceAddr=forward_init_flows[key]['src_ip'],
303 destAddr=forward_init_flows[key]['dst_ip'])
304 # create reverse flow streamblock
305 for key in reverse_init_flows.keys():
306 reverse_init_flows[key]['port_handle'] = reverse_port_handle
307 tmp_test_ip = reverse_init_flows[key]['tester_ip']
308 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
309 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
310 self.baseAPI.stc_config_port_location(
311 reverse_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port)
313 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(
314 port_name=reverse_init_flows[key]['port_handle'],
315 vlan_tag=reverse_init_flows[key]['vlan'],
316 ExpectedRxPort=forward_port_handle,
317 srcMac=reverse_init_flows[key]['src_mac'],
318 dstMac=reverse_init_flows[key]['dst_mac'],
319 sourceAddr=reverse_init_flows[key]['src_ip'],
320 destAddr=reverse_init_flows[key]['dst_ip'])
321 # Create the RFC 2544 frameloss test
322 frameloss_config = self.baseAPI.stc_create(
323 'Rfc2544FrameLossConfig -under ',
326 '-DurationSeconds 60 ',
327 '-LoadUnits PERCENT_LINE_RATE ',
329 '-CustomLoadList 100 '
330 '-UseExistingStreamBlocks True ',
331 '-EnableLearning False ',
332 '-FrameSizeIterationMode CUSTOM ',
333 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
335 '-EnableJitterMeasurement TRUE')
339 streamblock_list = '" '
340 for key in forward_init_flows.keys():
341 streamblock_list = streamblock_list + \
342 forward_init_flows[key]['streamblock'] + ' '
343 for key in reverse_init_flows.keys():
344 streamblock_list = streamblock_list + \
345 reverse_init_flows[key]['streamblock'] + ' '
346 streamblock_list = streamblock_list + '"'
348 frameloss_sbProfile = self.baseAPI.stc_create(
349 'Rfc2544StreamBlockProfile -under ' +
351 ' -Active TRUE -LocalActive TRUE')
352 self.baseAPI.stc_config(
354 '-StreamBlockList ' +
356 self.baseAPI.stc_perform(
357 'ExpandBenchmarkConfigCommand',
361 # attach the port before testing
362 port_list = [forward_port_handle, reverse_port_handle]
363 self.baseAPI.stc_attach_ports(port_list)
365 # stc apply and begin to sequence test
366 self.baseAPI.stc_apply()
367 self.baseAPI.stc_perform("SequencerStart")
369 # wait until complete
370 self.baseAPI.stc_waituntilcomplete()
373 resultsdb = self.baseAPI.stc_get(
374 "system1.project.TestResultSetting",
375 "-CurrentResultFileName")
376 results_dict = self.baseAPI.stc_perform(
378 '-DatabaseConnectionString',
380 '-ResultPath RFC2544FrameLossTestResultDetailedSummaryView')
383 return True, results_dict
385 def run_rfc2544_latency(self, forward_init_flows, reverse_init_flows):