############################################################################## # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others. # # All rights reserved. This program and the accompanying materials # are made available under the terms of the Apache License, Version 2.0 # which accompanies this distribution, and is available at # http://www.apache.org/licenses/LICENSE-2.0 ############################################################################## import time from spirent import stcPython class Spirent_Tools(object): baseAPI = stcPython() def __init__(self): """This class provide API of Spirent """ super(Spirent_Tools, self).__init__() def send_packet(self, flow): try: #import pdb # pdb.set_trace() flow = eval(flow) # stc init action self.baseAPI.stc_perform(' ResetConfig -config system1') self.baseAPI.stc_init() # create project project = self.baseAPI.stc_create_project() # create port port_handle = self.baseAPI.stc_create_port(project) # config port slot = flow['send_port'].split('/')[0] port = flow['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( port_handle, flow['tester_ip'], slot, port) # create streamblock streamblock_handle = self.baseAPI.stc_create_streamblock( port_name=port_handle, ExpectedRxPort='', vlan_tag=flow['vlan'], srcMac=flow['src_mac'], dstMac=flow['dst_mac'], sourceAddr=flow['src_ip'], destAddr=flow['dst_ip'] ) # attach port port_list = [port_handle] self.baseAPI.stc_attach_ports(port_list) # start streamblock streamblock_list = [streamblock_handle] flag = self.baseAPI.stc_streamblock_start(streamblock_list) return str(streamblock_list).strip('[]') except: print("[ERROR]create stream block and send packet failed.") return False def mac_learning(self, flowA, flowB): try: #import pdb # pdb.set_trace() flowA = eval(flowA) flowB = eval(flowB) port_list = [] streamblock_list = [] # stc init action self.baseAPI.stc_perform(' ResetConfig -config system1') self.baseAPI.stc_init() # create project project = self.baseAPI.stc_create_project() # create port and config port for flow in [flowA, flowB]: flow['port_handle'] = self.baseAPI.stc_create_port(project) tmp_test_ip = flow['tester_ip'] tmp_slot = flow['send_port'].split('/')[0] tmp_port = flow['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( flow['port_handle'], tmp_test_ip, tmp_slot, tmp_port) # create streamblock flow['streamblock'] = self.baseAPI.stc_create_streamblock( port_name=flow['port_handle'], ExpectedRxPort='', vlan_tag=flow['vlan'], srcMac=flow['src_mac'], dstMac=flow['dst_mac'], sourceAddr=flow['src_ip'], destAddr=flow['dst_ip']) # create port and stream block list port_list.append(flow['port_handle']) streamblock_list.append(flow['streamblock']) # attach port self.baseAPI.stc_attach_ports(port_list) # start streamblock flag = self.baseAPI.stc_streamblock_start(streamblock_list) # mac learning time.sleep(2) # stop stream block self.baseAPI.stc_streamblock_stop(streamblock_list) # delete streamblock and release port for flow in [flowA, flowB]: tmp_test_ip = flow['tester_ip'] tmp_slot = flow['send_port'].split('/')[0] tmp_port = flow['send_port'].split('/')[1] self.baseAPI.stc_delete(flow['streamblock']) self.baseAPI.stc_release( '%s/%s/%s' % (tmp_test_ip, tmp_slot, tmp_port)) # delete project self.baseAPI.stc_delete('project1') ret = self.baseAPI.stc_perform('ResetConfig -config system1') return True except: print("[ERROR]mac learning failed") return False def stop_flow(self, streamblock_list, flow): flow = eval(flow) streamblock_list = streamblock_list.strip('\'').split(',') # stop streamblock list try: ret = self.baseAPI.stc_streamblock_stop(streamblock_list) except: print("[ERROR]Stop the streamblock list failed.") return False # delete streamblock try: for streamblock in streamblock_list: ret = self.baseAPI.stc_delete(streamblock) except: print("[ERROR]delete stream block.") return False # release port try: slot = flow['send_port'].split('/')[0] port = flow['send_port'].split('/')[1] ret = self.baseAPI.stc_release( '%s/%s/%s' % (flow['tester_ip'], slot, port)) except: print("[ERROR]Release port failed") return False # delete project try: ret = self.baseAPI.stc_delete('project1') ret = self.baseAPI.stc_perform('ResetConfig -config system1') return True except: print("[ERROR]Delete project1 failed.") return False def run_rfc2544_throughput(self, forward_init_flows, reverse_init_flows): #import pdb # pdb.set_trace() # rebuild the flows forward_init_flows = eval(forward_init_flows) reverse_init_flows = eval(reverse_init_flows) # stc init action self.baseAPI.stc_perform(' ResetConfig -config system1') self.baseAPI.stc_init() # create project project = self.baseAPI.stc_create_project() # create sequencer seq_handle = self.baseAPI.stc_create('Sequencer -under %s' % (project)) # create port handle forward_port_handle = self.baseAPI.stc_create_port(project) reverse_port_handle = self.baseAPI.stc_create_port(project) # create forward flow streamblock for key in forward_init_flows.keys(): forward_init_flows[key]['port_handle'] = forward_port_handle tmp_test_ip = forward_init_flows[key]['tester_ip'] tmp_slot = forward_init_flows[key]['send_port'].split('/')[0] tmp_port = forward_init_flows[key]['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( forward_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port) # create streamblock forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock( port_name=forward_init_flows[key]['port_handle'], vlan_tag=forward_init_flows[key]['vlan'], ExpectedRxPort=reverse_port_handle, srcMac=forward_init_flows[key]['src_mac'], dstMac=forward_init_flows[key]['dst_mac'], sourceAddr=forward_init_flows[key]['src_ip'], destAddr=forward_init_flows[key]['dst_ip']) # create reverse flow streamblock for key in reverse_init_flows.keys(): reverse_init_flows[key]['port_handle'] = reverse_port_handle tmp_test_ip = reverse_init_flows[key]['tester_ip'] tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0] tmp_port = reverse_init_flows[key]['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( reverse_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port) # create streamblock reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock( port_name=reverse_init_flows[key]['port_handle'], vlan_tag=reverse_init_flows[key]['vlan'], ExpectedRxPort=forward_port_handle, srcMac=reverse_init_flows[key]['src_mac'], dstMac=reverse_init_flows[key]['dst_mac'], sourceAddr=reverse_init_flows[key]['src_ip'], destAddr=reverse_init_flows[key]['dst_ip']) # Create the RFC 2544 throughput test throughput_config = self.baseAPI.stc_create( 'Rfc2544ThroughputConfig -under ', project, '-AcceptableFrameLoss 0.01', '-NumOfTrials 1', '-DurationSeconds 60', '-SearchMode BINARY', '-RateLowerLimit 1', '-RateUpperLimit 100', '-RateInitial 10', '-UseExistingStreamBlocks True', '-EnableLearning False', '-FrameSizeIterationMode CUSTOM', '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"', '-LatencyType LIFO', '-EnableJitterMeasurement TRUE') #import pdb # pdb.set_trace() # list streamblocks streamblock_list = '" ' for key in forward_init_flows.keys(): streamblock_list = streamblock_list + \ forward_init_flows[key]['streamblock'] + ' ' for key in reverse_init_flows.keys(): streamblock_list = streamblock_list + \ reverse_init_flows[key]['streamblock'] + ' ' streamblock_list = streamblock_list + '"' throughput_sbProfile = self.baseAPI.stc_create( 'Rfc2544StreamBlockProfile -under ' + throughput_config + ' -Active TRUE -LocalActive TRUE') self.baseAPI.stc_config( throughput_sbProfile, '-StreamBlockList ' + streamblock_list) self.baseAPI.stc_perform( 'ExpandBenchmarkConfigCommand', '-config ', throughput_config) # attach the port before testing port_list = [forward_port_handle, reverse_port_handle] self.baseAPI.stc_attach_ports(port_list) # stc apply and begin to sequence test self.baseAPI.stc_apply() self.baseAPI.stc_perform("SequencerStart") # wait until complete self.baseAPI.stc_waituntilcomplete() # get result db resultsdb = self.baseAPI.stc_get( "system1.project.TestResultSetting", "-CurrentResultFileName") results_dict = self.baseAPI.stc_perform( 'QueryResult', '-DatabaseConnectionString', resultsdb, '-ResultPath RFC2544ThroughputTestResultDetailedSummaryView') # print results_dict return True, results_dict def run_rfc2544_frameloss(self, forward_init_flows, reverse_init_flows): #import pdb # pdb.set_trace() # rebuild the flows forward_init_flows = eval(forward_init_flows) reverse_init_flows = eval(reverse_init_flows) # stc init action self.baseAPI.stc_perform(' ResetConfig -config system1') self.baseAPI.stc_init() # create project project = self.baseAPI.stc_create_project() # create sequencer seq_handle = self.baseAPI.stc_create('Sequencer -under %s' % (project)) # create port handle forward_port_handle = self.baseAPI.stc_create_port(project) reverse_port_handle = self.baseAPI.stc_create_port(project) # create forward flow streamblock for key in forward_init_flows.keys(): forward_init_flows[key]['port_handle'] = forward_port_handle tmp_test_ip = forward_init_flows[key]['tester_ip'] tmp_slot = forward_init_flows[key]['send_port'].split('/')[0] tmp_port = forward_init_flows[key]['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( forward_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port) # create streamblock forward_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock( port_name=forward_init_flows[key]['port_handle'], vlan_tag=forward_init_flows[key]['vlan'], ExpectedRxPort=reverse_port_handle, srcMac=forward_init_flows[key]['src_mac'], dstMac=forward_init_flows[key]['dst_mac'], sourceAddr=forward_init_flows[key]['src_ip'], destAddr=forward_init_flows[key]['dst_ip']) # create reverse flow streamblock for key in reverse_init_flows.keys(): reverse_init_flows[key]['port_handle'] = reverse_port_handle tmp_test_ip = reverse_init_flows[key]['tester_ip'] tmp_slot = reverse_init_flows[key]['send_port'].split('/')[0] tmp_port = reverse_init_flows[key]['send_port'].split('/')[1] self.baseAPI.stc_config_port_location( reverse_init_flows[key]['port_handle'], tmp_test_ip, tmp_slot, tmp_port) # create streamblock reverse_init_flows[key]['streamblock'] = self.baseAPI.stc_create_streamblock( port_name=reverse_init_flows[key]['port_handle'], vlan_tag=reverse_init_flows[key]['vlan'], ExpectedRxPort=forward_port_handle, srcMac=reverse_init_flows[key]['src_mac'], dstMac=reverse_init_flows[key]['dst_mac'], sourceAddr=reverse_init_flows[key]['src_ip'], destAddr=reverse_init_flows[key]['dst_ip']) # Create the RFC 2544 frameloss test frameloss_config = self.baseAPI.stc_create( 'Rfc2544FrameLossConfig -under ', project, '-NumOfTrials 1 ', '-DurationSeconds 60 ', '-LoadUnits PERCENT_LINE_RATE ', '-LoadType CUSTOM ' '-CustomLoadList 100 ' '-UseExistingStreamBlocks True ', '-EnableLearning False ', '-FrameSizeIterationMode CUSTOM ', '-CustomFrameSizeList "70 128 256 512 1024 1280 1518"', '-LatencyType LIFO', '-EnableJitterMeasurement TRUE') #import pdb # pdb.set_trace() # list streamblocks streamblock_list = '" ' for key in forward_init_flows.keys(): streamblock_list = streamblock_list + \ forward_init_flows[key]['streamblock'] + ' ' for key in reverse_init_flows.keys(): streamblock_list = streamblock_list + \ reverse_init_flows[key]['streamblock'] + ' ' streamblock_list = streamblock_list + '"' frameloss_sbProfile = self.baseAPI.stc_create( 'Rfc2544StreamBlockProfile -under ' + frameloss_config + ' -Active TRUE -LocalActive TRUE') self.baseAPI.stc_config( frameloss_sbProfile, '-StreamBlockList ' + streamblock_list) self.baseAPI.stc_perform( 'ExpandBenchmarkConfigCommand', '-config ', frameloss_config) # attach the port before testing port_list = [forward_port_handle, reverse_port_handle] self.baseAPI.stc_attach_ports(port_list) # stc apply and begin to sequence test self.baseAPI.stc_apply() self.baseAPI.stc_perform("SequencerStart") # wait until complete self.baseAPI.stc_waituntilcomplete() # get result db resultsdb = self.baseAPI.stc_get( "system1.project.TestResultSetting", "-CurrentResultFileName") results_dict = self.baseAPI.stc_perform( 'QueryResult', '-DatabaseConnectionString', resultsdb, '-ResultPath RFC2544FrameLossTestResultDetailedSummaryView') #import pdb # pdb.set_trace() return True, results_dict def run_rfc2544_latency(self, forward_init_flows, reverse_init_flows): pass