3 from spirent import stcPython
5 class Spirent_Tools(object):
8 """This class provide API of Spirent
11 super(Spirent_Tools, self).__init__()
13 def send_packet(self,flow):
19 self.baseAPI.stc_perform(' ResetConfig -config system1')
20 self.baseAPI.stc_init()
22 project = self.baseAPI.stc_create_project()
24 port_handle = self.baseAPI.stc_create_port(project)
26 slot = flow['send_port'].split('/')[0]
27 port = flow['send_port'].split('/')[1]
28 self.baseAPI.stc_config_port_location(port_handle,flow['tester_ip'],slot,port)
30 streamblock_handle = self.baseAPI.stc_create_streamblock(
31 port_name = port_handle,
33 vlan_tag = flow['vlan'],
34 srcMac = flow['src_mac'],
35 dstMac = flow['dst_mac'],
36 sourceAddr = flow['src_ip'],
37 destAddr =flow['dst_ip']
40 port_list = [port_handle]
41 self.baseAPI.stc_attach_ports(port_list)
43 streamblock_list = [streamblock_handle]
44 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
45 return str(streamblock_list).strip('[]')
47 print("[ERROR]create stream block and send packet failed.")
50 def mac_learning(self,flowA,flowB):
59 self.baseAPI.stc_perform(' ResetConfig -config system1')
60 self.baseAPI.stc_init()
62 project = self.baseAPI.stc_create_project()
63 #create port and config port
64 for flow in [ flowA,flowB ]:
65 flow['port_handle'] = self.baseAPI.stc_create_port(project)
66 tmp_test_ip = flow['tester_ip']
67 tmp_slot = flow['send_port'].split('/')[0]
68 tmp_port = flow['send_port'].split('/')[1]
69 self.baseAPI.stc_config_port_location(flow['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
71 flow['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = flow['port_handle'],
73 vlan_tag = flow['vlan'],
74 srcMac = flow['src_mac'],
75 dstMac = flow['dst_mac'],
76 sourceAddr = flow['src_ip'],
77 destAddr =flow['dst_ip'])
78 #create port and stream block list
79 port_list.append(flow['port_handle'])
80 streamblock_list.append(flow['streamblock'])
83 self.baseAPI.stc_attach_ports(port_list)
85 flag = self.baseAPI.stc_streamblock_start(streamblock_list)
89 self.baseAPI.stc_streamblock_stop(streamblock_list)
90 # delete streamblock and release port
91 for flow in [ flowA,flowB ]:
92 tmp_test_ip = flow['tester_ip']
93 tmp_slot = flow['send_port'].split('/')[0]
94 tmp_port = flow['send_port'].split('/')[1]
95 self.baseAPI.stc_delete(flow['streamblock'])
96 self.baseAPI.stc_release('%s/%s/%s' %(tmp_test_ip,tmp_slot,tmp_port))
98 self.baseAPI.stc_delete('project1')
99 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
102 print("[ERROR]mac learning failed")
105 def stop_flow(self,streamblock_list,flow):
107 streamblock_list = streamblock_list.strip('\'').split(',')
108 #stop streamblock list
110 ret = self.baseAPI.stc_streamblock_stop(streamblock_list)
112 print("[ERROR]Stop the streamblock list failed.")
116 for streamblock in streamblock_list :
117 ret = self.baseAPI.stc_delete(streamblock)
119 print("[ERROR]delete stream block.")
123 slot = flow['send_port'].split('/')[0]
124 port = flow['send_port'].split('/')[1]
125 ret = self.baseAPI.stc_release('%s/%s/%s' %(flow['tester_ip'],slot,port))
127 print("[ERROR]Release port failed")
131 ret = self.baseAPI.stc_delete('project1')
132 ret = self.baseAPI.stc_perform('ResetConfig -config system1')
135 print("[ERROR]Delete project1 failed.")
138 def run_rfc2544_throughput(self,forward_init_flows,reverse_init_flows):
142 forward_init_flows = eval(forward_init_flows)
143 reverse_init_flows = eval(reverse_init_flows)
145 self.baseAPI.stc_perform(' ResetConfig -config system1')
146 self.baseAPI.stc_init()
148 project = self.baseAPI.stc_create_project()
150 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
152 forward_port_handle = self.baseAPI.stc_create_port(project)
153 reverse_port_handle = self.baseAPI.stc_create_port(project)
154 #create forward flow streamblock
155 for key in forward_init_flows.keys():
156 forward_init_flows[key]['port_handle'] = forward_port_handle
157 tmp_test_ip = forward_init_flows[key]['tester_ip']
158 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
159 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
160 self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
162 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
163 vlan_tag = forward_init_flows[key]['vlan'],
164 ExpectedRxPort = reverse_port_handle,
165 srcMac = forward_init_flows[key]['src_mac'],
166 dstMac = forward_init_flows[key]['dst_mac'],
167 sourceAddr = forward_init_flows[key]['src_ip'],
168 destAddr = forward_init_flows[key]['dst_ip'])
169 #create reverse flow streamblock
170 for key in reverse_init_flows.keys():
171 reverse_init_flows[key]['port_handle'] = reverse_port_handle
172 tmp_test_ip = reverse_init_flows[key]['tester_ip']
173 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
174 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
175 self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
177 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
178 vlan_tag = reverse_init_flows[key]['vlan'],
179 ExpectedRxPort = forward_port_handle,
180 srcMac = reverse_init_flows[key]['src_mac'],
181 dstMac = reverse_init_flows[key]['dst_mac'],
182 sourceAddr = reverse_init_flows[key]['src_ip'],
183 destAddr = reverse_init_flows[key]['dst_ip'])
184 #Create the RFC 2544 throughput test
185 throughput_config = self.baseAPI.stc_create('Rfc2544ThroughputConfig -under ',project,
186 '-AcceptableFrameLoss 0.01',
188 '-DurationSeconds 60',
189 '-SearchMode BINARY',
191 '-RateUpperLimit 100',
193 '-UseExistingStreamBlocks True',
194 '-EnableLearning False',
195 '-FrameSizeIterationMode CUSTOM',
196 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
198 '-EnableJitterMeasurement TRUE'
203 streamblock_list = '" '
204 for key in forward_init_flows.keys():
205 streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
206 for key in reverse_init_flows.keys():
207 streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
208 streamblock_list = streamblock_list+'"'
210 throughput_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+throughput_config+' -Active TRUE -LocalActive TRUE')
211 self.baseAPI.stc_config(throughput_sbProfile,'-StreamBlockList '+streamblock_list)
212 self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',throughput_config)
214 #attach the port before testing
215 port_list = [ forward_port_handle,reverse_port_handle]
216 self.baseAPI.stc_attach_ports(port_list)
218 #stc apply and begin to sequence test
219 self.baseAPI.stc_apply()
220 self.baseAPI.stc_perform("SequencerStart")
223 self.baseAPI.stc_waituntilcomplete()
226 resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
227 results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544ThroughputTestResultDetailedSummaryView')
229 return True,results_dict
231 def run_rfc2544_frameloss(self,forward_init_flows,reverse_init_flows):
235 forward_init_flows = eval(forward_init_flows)
236 reverse_init_flows = eval(reverse_init_flows)
238 self.baseAPI.stc_perform(' ResetConfig -config system1')
239 self.baseAPI.stc_init()
241 project = self.baseAPI.stc_create_project()
243 seq_handle = self.baseAPI.stc_create('Sequencer -under %s' %(project))
245 forward_port_handle = self.baseAPI.stc_create_port(project)
246 reverse_port_handle = self.baseAPI.stc_create_port(project)
247 #create forward flow streamblock
248 for key in forward_init_flows.keys():
249 forward_init_flows[key]['port_handle'] = forward_port_handle
250 tmp_test_ip = forward_init_flows[key]['tester_ip']
251 tmp_slot = forward_init_flows[key]['send_port'].split('/')[0]
252 tmp_port = forward_init_flows[key]['send_port'].split('/')[1]
253 self.baseAPI.stc_config_port_location(forward_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
255 forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = forward_init_flows[key]['port_handle'],
256 vlan_tag = forward_init_flows[key]['vlan'],
257 ExpectedRxPort = reverse_port_handle,
258 srcMac = forward_init_flows[key]['src_mac'],
259 dstMac = forward_init_flows[key]['dst_mac'],
260 sourceAddr = forward_init_flows[key]['src_ip'],
261 destAddr = forward_init_flows[key]['dst_ip'])
262 #create reverse flow streamblock
263 for key in reverse_init_flows.keys():
264 reverse_init_flows[key]['port_handle'] = reverse_port_handle
265 tmp_test_ip = reverse_init_flows[key]['tester_ip']
266 tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0]
267 tmp_port = reverse_init_flows[key]['send_port'].split('/')[1]
268 self.baseAPI.stc_config_port_location(reverse_init_flows[key]['port_handle'],tmp_test_ip,tmp_slot,tmp_port)
270 reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock(port_name = reverse_init_flows[key]['port_handle'],
271 vlan_tag = reverse_init_flows[key]['vlan'],
272 ExpectedRxPort = forward_port_handle,
273 srcMac = reverse_init_flows[key]['src_mac'],
274 dstMac = reverse_init_flows[key]['dst_mac'],
275 sourceAddr = reverse_init_flows[key]['src_ip'],
276 destAddr = reverse_init_flows[key]['dst_ip'])
277 #Create the RFC 2544 frameloss test
278 frameloss_config = self.baseAPI.stc_create('Rfc2544FrameLossConfig -under ',project,
280 '-DurationSeconds 60 ',
281 '-LoadUnits PERCENT_LINE_RATE ',
283 '-CustomLoadList 100 '
284 '-UseExistingStreamBlocks True ',
285 '-EnableLearning False ',
286 '-FrameSizeIterationMode CUSTOM ',
287 '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"',
289 '-EnableJitterMeasurement TRUE'
294 streamblock_list = '" '
295 for key in forward_init_flows.keys():
296 streamblock_list = streamblock_list+forward_init_flows[key]['streamblock']+' '
297 for key in reverse_init_flows.keys():
298 streamblock_list = streamblock_list+reverse_init_flows[key]['streamblock']+' '
299 streamblock_list = streamblock_list+'"'
301 frameloss_sbProfile= self.baseAPI.stc_create('Rfc2544StreamBlockProfile -under '+frameloss_config+' -Active TRUE -LocalActive TRUE')
302 self.baseAPI.stc_config(frameloss_sbProfile,'-StreamBlockList '+streamblock_list)
303 self.baseAPI.stc_perform('ExpandBenchmarkConfigCommand','-config ',frameloss_config)
305 #attach the port before testing
306 port_list = [ forward_port_handle,reverse_port_handle]
307 self.baseAPI.stc_attach_ports(port_list)
309 #stc apply and begin to sequence test
310 self.baseAPI.stc_apply()
311 self.baseAPI.stc_perform("SequencerStart")
314 self.baseAPI.stc_waituntilcomplete()
317 resultsdb = self.baseAPI.stc_get("system1.project.TestResultSetting", "-CurrentResultFileName")
318 results_dict = self.baseAPI.stc_perform('QueryResult','-DatabaseConnectionString',resultsdb,'-ResultPath RFC2544FrameLossTestResultDetailedSummaryView')
321 return True,results_dict
323 def run_rfc2544_latency(self,forward_init_flows,reverse_init_flows):