1 # Copyright (c) 2018-2019 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from yardstick.common import exceptions
22 from yardstick.common import utils as common_utils
23 from yardstick.common import yaml_loader
24 from yardstick.network_services import utils as net_serv_utils
25 from yardstick.network_services.vnf_generic.vnf import sample_vnf
28 from lsapi import LsApi
30 LsApi = common_utils.ErrorClass
32 LOG = logging.getLogger(__name__)
35 class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
36 APP_NAME = 'LandslideTG'
38 def __init__(self, name, vnfd, setup_env_helper_type=None,
39 resource_helper_type=None):
40 if resource_helper_type is None:
41 resource_helper_type = LandslideResourceHelper
42 super(LandslideTrafficGen, self).__init__(name, vnfd,
43 setup_env_helper_type,
46 self.bin_path = net_serv_utils.get_nsb_option('bin_path')
48 self.runs_traffic = True
49 self.traffic_finished = False
50 self.session_profile = None
52 def listen_traffic(self, traffic_profile):
56 self.resource_helper.disconnect()
58 def instantiate(self, scenario_cfg, context_cfg):
59 super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
60 self.resource_helper.connect()
63 test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
64 self.resource_helper.create_test_servers(test_servers)
67 [self.resource_helper.create_suts(x['suts']) for x in
68 self.vnfd_helper['config']]
70 # Fill in test session based on session profile and test case options
71 self._load_session_profile()
73 def run_traffic(self, traffic_profile):
74 self.resource_helper.abort_running_tests()
75 # Update DMF profile with related test case options
76 traffic_profile.update_dmf(self.scenario_helper.all_options)
77 # Create DMF in test user library
78 self.resource_helper.create_dmf(traffic_profile.dmf_config)
79 # Create/update test session in test user library
80 self.resource_helper.create_test_session(self.session_profile)
82 self.resource_helper.create_running_tests(self.session_profile['name'])
84 def collect_kpi(self):
85 return self.resource_helper.collect_kpi()
87 def wait_for_instantiate(self):
91 def _update_session_suts(suts, testcase):
92 """ Create SUT entry. Update related EPC block in session profile. """
94 # Update session profile EPC element with SUT info from pod file
95 tc_role = testcase['parameters'].get(sut['role'])
98 if tc_role['class'] == 'Sut':
99 _param['name'] = sut['name']
100 elif tc_role['class'] == 'TestNode':
101 _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
102 if x in sut and sut[x]})
103 testcase['parameters'][sut['role']].update(_param)
105 LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
108 def _update_session_test_servers(self, test_server, _tsgroup_index):
109 """ Update tsId, reservations, pre-resolved ARP in session profile """
110 # Update test server name
111 test_groups = self.session_profile['tsGroups']
112 test_groups[_tsgroup_index]['tsId'] = test_server['name']
114 # Update preResolvedArpAddress
115 arp_key = 'preResolvedArpAddress'
116 _preresolved_arp = test_server.get(arp_key) # list of dicts
118 test_groups[_tsgroup_index][arp_key] = _preresolved_arp
120 # Update reservations
121 if 'phySubnets' in test_server:
122 reservation = {'tsId': test_server['name'],
123 'tsIndex': _tsgroup_index,
124 'tsName': test_server['name'],
125 'phySubnets': test_server['phySubnets']}
126 if 'reservations' in self.session_profile:
127 self.session_profile['reservations'].append(reservation)
129 self.session_profile['reservePorts'] = 'true'
130 self.session_profile['reservations'] = [reservation]
132 def _update_session_library_name(self, test_session):
133 """Update DMF library name in session profile"""
134 for _ts_group in test_session['tsGroups']:
135 for _tc in _ts_group['testCases']:
137 for _mainflow in _tc['parameters']['Dmf']['mainflows']:
138 _mainflow['library'] = \
139 self.vnfd_helper.mgmt_interface['user']
144 def _update_session_tc_params(tc_options, testcase):
145 for _param_key in tc_options:
146 if _param_key == 'AssociatedPhys':
147 testcase[_param_key] = tc_options[_param_key]
149 testcase['parameters'][_param_key] = tc_options[_param_key]
152 def _load_session_profile(self):
154 with common_utils.open_relative_file(
155 self.scenario_helper.scenario_cfg['session_profile'],
156 self.scenario_helper.task_path) as stream:
157 self.session_profile = yaml_loader.yaml_load(stream)
159 # Raise exception if number of entries differs in following files,
160 _config_files = ['pod file', 'session_profile file', 'test_case file']
161 # Count testcases number in all tsGroups of session profile
162 session_tests_num = [xx for x in self.session_profile['tsGroups']
163 for xx in x['testCases']]
164 # Create a set containing number of list elements in each structure
165 _config_files_blocks_num = [
167 (self.vnfd_helper['config'], # test_servers and suts info
169 self.scenario_helper.all_options['test_cases'])] # test case file
171 if len(set(_config_files_blocks_num)) != 1:
172 raise RuntimeError('Unequal number of elements. {}'.format(
173 dict(six.moves.zip_longest(_config_files,
174 _config_files_blocks_num))))
180 # Iterate over data structures to overwrite session profile defaults
181 # _config: single list element holding test servers and SUTs info
182 # _tc_options: single test case parameters
183 for _config, tc_options in zip(
184 self.vnfd_helper['config'], # test servers and SUTS
185 self.scenario_helper.all_options['test_cases']): # testcase
187 _ts_config = _config['test_server']
189 # Calculate test group/test case indexes based on test server name
190 if _ts_config['name'] in ts_names:
197 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
200 if _testcase['type'] != _ts_config['role']:
202 'Test type mismatch in TC#{} of test server {}'.format(
203 _testcase_idx, _ts_config['name']))
205 # Fill session profile with test servers parameters
206 if _ts_config['name'] not in ts_names:
207 self._update_session_test_servers(_ts_config, _tsgroup_idx)
208 ts_names.add(_ts_config['name'])
210 # Fill session profile with suts parameters
211 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
212 _testcase_idx].update(
213 self._update_session_suts(_config['suts'], _testcase))
215 # Update test case parameters
216 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
217 _testcase_idx].update(
218 self._update_session_tc_params(tc_options, _testcase))
220 self._update_session_library_name(self.session_profile)
223 class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
224 """Landslide TG helper class"""
226 REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409}
227 REST_API_CODES = {'NOT MODIFIED': 500810}
229 def __init__(self, setup_helper):
230 super(LandslideResourceHelper, self).__init__(setup_helper)
232 self.vnfd_helper = setup_helper.vnfd_helper
233 self.scenario_helper = setup_helper.scenario_helper
235 # TAS Manager config initialization
239 self.license_data = {}
241 # TCL session initialization
242 self._tcl = LandslideTclClient(LsTclHandler(), self)
244 self.session = requests.Session()
245 self.running_tests_uri = 'runningTests'
246 self.test_session_uri = 'testSessions'
247 self.test_serv_uri = 'testServers'
248 self.suts_uri = 'suts'
249 self.users_uri = 'users'
250 self.user_lib_uri = None
253 def abort_running_tests(self, timeout=60, delay=5):
254 """ Abort running test sessions, if any """
255 _start_time = time.time()
256 while time.time() < _start_time + timeout:
257 run_tests_states = {x['id']: x['testStateOrStep']
258 for x in self.get_running_tests()}
259 if not set(run_tests_states.values()).difference(
260 {'COMPLETE', 'COMPLETE_ERROR'}):
263 [self.stop_running_tests(running_test_id=_id, force=True)
264 for _id, _state in run_tests_states.items()
265 if 'COMPLETE' not in _state]
269 'Some test runs not stopped during {} seconds'.format(timeout))
271 def _build_url(self, resource, action=None):
274 :param resource: REST API resource name
276 :param action: actions name and value
277 :type action: dict('name': <str>, 'value': <str>)
278 :returns str: REST API resource name with optional action info
280 # Action is optional and accepted only in presence of resource param
281 if action and not resource:
282 raise ValueError("Resource name not provided")
283 # Concatenate actions
284 _action = ''.join(['?{}={}'.format(k, v) for k, v in
285 action.items()]) if action else ''
287 return ''.join([self._url, resource, _action])
289 def get_response_params(self, method, resource, params=None):
290 """ Retrieve params from JSON response of specific resource URL
292 :param method: one of supported REST API methods
294 :param resource: URI, requested resource name
296 :param params: attributes to be found in JSON response
297 :type params: list(str)
300 params = params if params else []
301 response = self.exec_rest_request(method, resource)
302 # Get substring between last slash sign and question mark (if any)
303 url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0]
304 _response_json = response.json()
305 # Expect dict(), if URL last part and top dict key don't match
306 # Else, if they match, expect list()
307 k, v = list(_response_json.items())[0]
308 if k != url_last_part:
309 v = [v] # v: list(dict(str: str))
310 # Extract params, or whole list of dicts (without top level key)
312 _res.append({param: x[param] for param in params} if params else x)
315 def _create_user(self, auth, level=1):
318 :param auth: data to create user account on REST server
320 :param level: Landslide user permissions level
322 :returns int: user id
324 # Set expiration date in two years since account creation date
325 _exp_date = time.strftime(
326 '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2))
327 _username = auth['user']
328 _fields = {"contactInformation": "", "expiresOn": _exp_date,
329 "fullName": "Test User",
330 "isActive": "true", "level": level,
331 "password": auth['password'],
332 "username": _username}
333 _response = self.exec_rest_request('post', self.users_uri,
334 json_data=_fields, raise_exc=False)
335 _resp_json = _response.json()
336 if _response.status_code == self.REST_STATUS_CODES['CREATED']:
338 _id = _resp_json['id']
339 LOG.info("New user created: username='%s', id='%s'", _username,
341 elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']:
342 # User already exists
343 LOG.info("Account '%s' already exists.", _username)
345 _id = self._modify_user(_username, {"isActive": "true"})['id']
347 raise exceptions.RestApiError(
348 'Error during new user "{}" creation'.format(_username))
351 def _modify_user(self, username, fields):
352 """ Modify information about existing user
354 :param username: user name of account to be modified
356 :param fields: data to modify user account on REST server
358 :returns dict: user info
360 _response = self.exec_rest_request('post', self.users_uri,
361 action={'username': username},
362 json_data=fields, raise_exc=False)
363 if _response.status_code == self.REST_STATUS_CODES['OK']:
364 _response = _response.json()
366 raise exceptions.RestApiError(
367 'Error during user "{}" data update: {}'.format(
369 _response.status_code))
370 LOG.info("User account '%s' modified: '%s'", username, _response)
373 def _delete_user(self, username):
374 """ Delete user account
376 :param username: username field
378 :returns bool: True if succeeded
380 self.exec_rest_request('delete', self.users_uri,
381 action={'username': username})
383 def _get_users(self, username=None):
384 """ Get user records from REST server
386 :param username: username field
387 :type username: None|str
388 :returns list(dict): empty list, or user record, or list of all users
390 _response = self.get_response_params('get', self.users_uri)
391 _res = [u for u in _response if
392 u['username'] == username] if username else _response
395 def exec_rest_request(self, method, resource, action=None, json_data=None,
396 logs=True, raise_exc=True):
397 """ Execute REST API request, return response object
399 :param method: one of supported requests ('post', 'get', 'delete')
401 :param resource: URL of resource
403 :param action: data used to provide URI located after question mark
405 :param json_data: mandatory only for 'post' method
406 :type json_data: dict
407 :param logs: debug logs display flag
408 :type raise_exc: bool
409 :param raise_exc: if True, raise exception on REST API call error
410 :returns requests.Response(): REST API call response object
412 json_data = json_data if json_data else {}
413 action = action if action else {}
414 _method = method.upper()
415 method = method.lower()
416 if method not in ('post', 'get', 'delete'):
417 raise ValueError("Method '{}' not supported".format(_method))
419 if method == 'post' and not action:
420 if not (json_data and isinstance(json_data, collections.Mapping)):
422 'JSON data missing in {} request'.format(_method))
424 r = getattr(self.session, method)(self._build_url(resource, action),
426 if raise_exc and not r.ok:
427 msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format(
428 method, self._build_url(resource, action), r.reason)
429 raise exceptions.RestApiError(msg)
432 LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method,
434 LOG.debug("Response: %s", r.json())
438 """Connect to RESTful server using test user account"""
439 tas_info = self.vnfd_helper['mgmt-interface']
440 # Supported REST Server ports: HTTP - 8080, HTTPS - 8181
441 _port = '8080' if tas_info['proto'] == 'http' else '8181'
442 tas_info.update({'port': _port})
443 self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info)
444 self.session.headers.update({'Accept': 'application/json',
445 'Content-type': 'application/json'})
446 # Login with super user to create test user
447 self.session.auth = (
448 tas_info['super-user'], tas_info['super-user-password'])
449 LOG.info("Connect using superuser: server='%s'", self._url)
450 auth = {x: tas_info[x] for x in ('user', 'password')}
451 self._user_id = self._create_user(auth)
452 # Login with test user
453 self.session.auth = auth['user'], auth['password']
455 self.exec_rest_request('get', '')
457 self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri)
458 LOG.info("Login with test user: server='%s'", self._url)
459 # Read existing license
460 self.license_data['lic_id'] = tas_info['license']
463 self._tcl.connect(tas_info['ip'], *self.session.auth)
467 def disconnect(self):
469 self._tcl.disconnect()
472 self._terminated.value = 1
474 def create_dmf(self, dmf):
475 if isinstance(dmf, dict):
478 # Update DMF library name in traffic profile
480 {'library': self.vnfd_helper.mgmt_interface['user']})
481 # Create DMF on Landslide server
482 self._tcl.create_dmf(_dmf)
484 def delete_dmf(self, dmf):
485 if isinstance(dmf, list):
487 self._tcl.delete_dmf(_dmf)
489 self._tcl.delete_dmf(dmf)
491 def create_suts(self, suts):
492 # Keep only supported keys in suts object
494 sut_entry = {k: v for k, v in _sut.items()
495 if k not in {'phy', 'nextHop', 'role'}}
496 _response = self.exec_rest_request(
497 'post', self.suts_uri, json_data=sut_entry,
498 logs=False, raise_exc=False)
499 if _response.status_code != self.REST_STATUS_CODES['CREATED']:
500 LOG.info(_response.reason) # Failed to create
501 _name = sut_entry.pop('name')
502 # Modify existing SUT
503 self.configure_sut(sut_name=_name, json_data=sut_entry)
505 LOG.info("SUT created: %s", sut_entry)
507 def get_suts(self, suts_id=None):
509 _suts = self.exec_rest_request(
510 'get', '{}/{}'.format(self.suts_uri, suts_id)).json()
512 _suts = self.get_response_params('get', self.suts_uri)
516 def configure_sut(self, sut_name, json_data):
517 """ Modify information of specific SUTs
519 :param sut_name: name of existing SUT
521 :param json_data: SUT settings
522 :type json_data: dict()
524 LOG.info("Modifying SUT information...")
525 _response = self.exec_rest_request('post',
527 action={'name': sut_name},
530 if _response.status_code not in {self.REST_STATUS_CODES[x] for x in
531 {'OK', 'NO CHANGE'}}:
532 raise exceptions.RestApiError(_response.reason)
534 LOG.info("Modified SUT: %s", sut_name)
536 def delete_suts(self, suts_ids=None):
538 _curr_suts = self.get_response_params('get', self.suts_uri)
539 suts_ids = [x['id'] for x in _curr_suts]
540 LOG.info("Deleting SUTs with following IDs: %s", suts_ids)
542 self.exec_rest_request('delete',
543 '{}/{}'.format(self.suts_uri, _id))
544 LOG.info("\tDone for SUT id: %s", _id)
546 def _check_test_servers_state(self, test_servers_ids=None, delay=10,
548 LOG.info("Waiting for related test servers state change to READY...")
549 # Wait on state change
550 _start_time = time.time()
551 while time.time() - _start_time < timeout:
552 ts_ids_not_ready = {x['id'] for x in
553 self.get_test_servers(test_servers_ids)
554 if x['state'] != 'READY'}
555 if ts_ids_not_ready == set():
560 'Test servers not in READY state after {} seconds.'.format(
563 def create_test_servers(self, test_servers):
564 """ Create test servers
566 :param test_servers: input data for test servers creation
567 mandatory fields: managementIp
568 optional fields: name
569 :type test_servers: list(dict)
572 for _ts in test_servers:
573 _msg = 'Created test server "%(name)s"'
574 _ts_ids.append(self._tcl.create_test_server(_ts))
575 if _ts.get('thread_model'):
576 _msg += ' in mode: "%(thread_model)s"'
579 self._check_test_servers_state(_ts_ids)
581 def get_test_servers(self, test_server_ids=None):
582 if not test_server_ids: # Get all test servers info
583 _test_servers = self.exec_rest_request(
584 'get', self.test_serv_uri).json()[self.test_serv_uri]
585 LOG.info("Current test servers configuration: %s", _test_servers)
589 for _id in test_server_ids:
590 _test_servers.append(self.exec_rest_request(
591 'get', '{}/{}'.format(self.test_serv_uri, _id)).json())
592 LOG.info("Current test servers configuration: %s", _test_servers)
595 def configure_test_servers(self, action, json_data=None,
596 test_server_ids=None):
597 if not test_server_ids:
598 test_server_ids = [x['id'] for x in self.get_test_servers()]
599 elif isinstance(test_server_ids, int):
600 test_server_ids = [test_server_ids]
601 for _id in test_server_ids:
602 self.exec_rest_request('post',
603 '{}/{}'.format(self.test_serv_uri, _id),
604 action=action, json_data=json_data)
605 LOG.info("Test server (id: %s) configuration done: %s", _id,
607 return test_server_ids
609 def delete_test_servers(self, test_servers_ids=None):
610 # Delete test servers
611 for _ts in self.get_test_servers(test_servers_ids):
612 self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri,
614 LOG.info("Deleted test server: %s", _ts['name'])
616 def create_test_session(self, test_session):
617 # Use tcl client to create session
618 test_session['library'] = self._user_id
620 # If no traffic duration set in test case, use predefined default value
622 test_session['duration'] = self.scenario_helper.all_options.get(
624 test_session['duration'])
626 LOG.debug("Creating session='%s'", test_session['name'])
627 self._tcl.create_test_session(test_session)
629 def get_test_session(self, test_session_name=None):
630 if test_session_name:
631 uri = 'libraries/{}/{}/{}'.format(self._user_id,
632 self.test_session_uri,
635 uri = self.user_lib_uri.format(self._user_id)
636 _test_sessions = self.exec_rest_request('get', uri).json()
637 return _test_sessions
639 def configure_test_session(self, template_name, test_session):
640 # Override specified test session parameters
641 LOG.info('Update test session parameters: %s', test_session['name'])
642 test_session.update({'library': self._user_id})
643 return self.exec_rest_request(
645 action={'action': 'overrideAndSaveAs'},
646 json_data=test_session,
647 resource='{}/{}'.format(self.user_lib_uri.format(self._user_id),
650 def delete_test_session(self, test_session):
651 return self.exec_rest_request('delete', '{}/{}'.format(
652 self.user_lib_uri.format(self._user_id), test_session))
654 def create_running_tests(self, test_session_name):
655 r = self.exec_rest_request('post',
656 self.running_tests_uri,
657 json_data={'library': self._user_id,
658 'name': test_session_name})
659 if r.status_code != self.REST_STATUS_CODES['CREATED']:
660 raise exceptions.RestApiError('Failed to start test session.')
661 self.run_id = r.json()['id']
663 def get_running_tests(self, running_test_id=None):
664 """Get JSON structure of specified running test entity
666 :param running_test_id: ID of created running test entity
667 :type running_test_id: int
668 :returns list: running tests entity
670 if not running_test_id:
672 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
673 _res = self.exec_rest_request('get', _res_name, logs=False).json()
674 # If no run_id specified, skip top level key in response dict.
675 # Else return JSON as list
676 return _res.get('runningTests', [_res])
678 def delete_running_tests(self, running_test_id=None):
679 if not running_test_id:
681 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
682 self.get_response_params('delete', _res_name)
683 LOG.info("Deleted running test with id: %s", running_test_id)
685 def _running_tests_action(self, running_test_id, action, json_data=None):
689 # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc'
690 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
691 self.exec_rest_request('post', _res_name, {'action': action},
693 LOG.debug("Executed action: '%s' on running test id: %s", action,
696 def stop_running_tests(self, running_test_id, json_data=None, force=False):
697 _action = 'abort' if force else 'stop'
698 self._running_tests_action(running_test_id, _action,
700 LOG.info('Performed action: "%s" to test run with id: %s', _action,
703 def check_running_test_state(self, run_id):
704 r = self.exec_rest_request('get',
705 '{}/{}'.format(self.running_tests_uri,
707 return r.json().get("testStateOrStep")
709 def get_running_tests_results(self, run_id):
710 _res = self.exec_rest_request(
712 '{}/{}/{}'.format(self.running_tests_uri,
714 'measurements')).json()
717 def _write_results(self, results):
718 # Avoid None value at test session start
719 _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0
721 _res_tabs = results.get('tabs')
722 # Avoid parsing 'tab' dict key initially (missing or empty)
726 # Flatten nested dict holding Landslide KPIs of current test run
728 for _tab, _kpis in six.iteritems(_res_tabs):
729 for _kpi, _value in six.iteritems(_kpis):
730 # Combine table name and KPI name using delimiter "::"
731 _key = '::'.join([_tab, _kpi])
733 # Cast value from str to float
734 # Remove comma and/or measure units, e.g. "us"
735 flat_kpis_dict[_key] = float(
736 _value.split(' ')[0].replace(',', ''))
737 except ValueError: # E.g. if KPI represents datetime
739 LOG.info("Polling test results of test run id: %s. Elapsed time: %s "
740 "seconds", self.run_id, _elapsed_time)
741 return flat_kpis_dict
743 def collect_kpi(self):
744 if 'COMPLETE' in self.check_running_test_state(self.run_id):
745 self._result.update({'done': True})
747 _res = self.get_running_tests_results(self.run_id)
748 _kpis = self._write_results(_res)
750 _kpis.update({'run_id': int(self.run_id)})
751 _kpis.update({'iteration': _res['iteration']})
752 self._result.update(_kpis)
756 class LandslideTclClient(object):
757 """Landslide TG TCL client class"""
759 DEFAULT_TEST_NODE = {
760 'ethStatsEnabled': True,
761 'forcedEthInterface': '',
767 'numLinksOrNodes': 1,
770 'uniqueVlanAddr': False,
773 'vlanUserPriority': 0,
778 'ls::create -TestNode-{} -under $p_ -Type "eth"' \
779 ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \
780 ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \
781 ' -ForcedEthInterface "{forcedEthInterface}"' \
782 ' -EthStatsEnabled {ethStatsEnabled}' \
783 ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \
784 ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \
787 def __init__(self, tcl_handler, ts_context):
788 self.tcl_server_ip = None
790 self._library_id = None
791 self._basic_library_id = None
792 self._tcl = tcl_handler
793 self._ts_context = ts_context
796 # Test types names expected in session profile, test case and pod files
797 self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node",
800 self._class_param_config_handler = {
801 "Array": self._configure_array_param,
802 "TestNode": self._configure_test_node_param,
803 "Sut": self._configure_sut_param,
804 "Dmf": self._configure_dmf_param
807 def connect(self, tcl_server_ip, username, password):
808 """ Connect to TCL server with username and password
810 :param tcl_server_ip: TCL server IP address
811 :type tcl_server_ip: str
812 :param username: existing username on TCL server
814 :param password: password related to username on TCL server
817 LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username)
818 res = self._tcl.execute(
819 "ls::login {} {} {}".format(tcl_server_ip, username, password))
820 if 'java0x' not in res: # handle assignment reflects login success
821 raise exceptions.LandslideTclException(
822 "connect: login failed ='{}'.".format(res))
823 self._library_id = self._tcl.execute(
824 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
826 self._basic_library_id = self._get_library_id('Basic')
827 self.tcl_server_ip = tcl_server_ip
828 self._user = username
829 LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user,
831 self._basic_library_id)
833 def disconnect(self):
834 """ Disconnect from TCL server. Drop TCL connection configuration """
835 LOG.info("disconnect: server='%s' user='%s'",
836 self.tcl_server_ip, self._user)
837 self._tcl.execute("ls::logout")
838 self.tcl_server_ip = None
840 self._library_id = None
841 self._basic_library_id = None
843 def _add_test_server(self, name, ip):
845 # Check if test server exists with name equal to _ts_name
846 ts_id = int(self.resolve_test_server_name(name))
848 # Such test server does not exist. Attempt to create it
849 ts_id = self._tcl.execute(
850 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip))
854 # Failed to create test server, e.g. limit reached
856 'Failed to create test server: "{}". {}'.format(name,
860 def _update_license(self, name):
861 """ Setup/update test server license
863 :param name: test server name
866 # Retrieve current TsInfo configuration, result stored in handle "ts"
868 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name))
870 # Set license ID, if it differs from current one, update test server
871 _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense')
872 if _curr_lic_id != self._ts_context.license_data['lic_id']:
873 self._tcl.execute('ls::config $ts -RequestedLicense {}'.format(
874 self._ts_context.license_data['lic_id']))
875 self._tcl.execute('ls::perform ModifyTs $ts')
877 def _set_thread_model(self, name, thread_model):
878 # Retrieve test server configuration, store it in handle "tsc"
879 _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][
882 'set tsc [ls::perform RetrieveTsConfiguration '
883 '-name "{}" {}]'.format(name, _cfguser_password))
884 # Configure ThreadModel, if it differs from current one
885 thread_model_map = {'Legacy': 'V0',
887 'Fireball': 'V1_FB3'}
888 _model = thread_model_map[thread_model]
889 _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel')
890 if _curr_model != _model:
892 'ls::config $tsc -ThreadModel "{}"'.format(_model))
894 'ls::perform ApplyTsConfiguration $tsc {}'.format(
897 def create_test_server(self, test_server):
898 _ts_thread_model = test_server.get('thread_model')
899 _ts_name = test_server['name']
901 ts_id = self._add_test_server(_ts_name, test_server['ip'])
903 self._update_license(_ts_name)
905 # Skip below code modifying thread_model if it is not defined
907 self._set_thread_model(_ts_name, _ts_thread_model)
911 def create_test_session(self, test_session):
912 """ Create, configure and save Landslide test session object.
914 :param test_session: Landslide TestSession object
915 :type test_session: dict
917 LOG.info("create_test_session: name='%s'", test_session['name'])
918 self._tcl.execute('set test_ [ls::create TestSession]')
919 self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format(
920 self._library_id, test_session['name']))
921 self._tcl.execute('ls::config $test_ -Description "{}"'.format(
922 test_session['description']))
923 if 'keywords' in test_session:
924 self._tcl.execute('ls::config $test_ -Keywords "{}"'.format(
925 test_session['keywords']))
926 if 'duration' in test_session:
927 self._tcl.execute('ls::config $test_ -Duration "{}"'.format(
928 test_session['duration']))
929 if 'iterations' in test_session:
930 self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
931 test_session['iterations']))
932 if 'reservePorts' in test_session:
933 if test_session['reservePorts'] == 'true':
934 self._tcl.execute('ls::config $test_ -Reserve Ports')
936 if 'reservations' in test_session:
937 for _reservation in test_session['reservations']:
938 self._configure_reservation(_reservation)
940 if 'reportOptions' in test_session:
941 self._configure_report_options(test_session['reportOptions'])
943 for _index, _group in enumerate(test_session['tsGroups']):
944 self._configure_ts_group(_group, _index)
946 self._save_test_session()
948 def create_dmf(self, dmf):
949 """ Create, configure and save Landslide Data Message Flow object.
951 :param dmf: Landslide Data Message Flow object
954 self._tcl.execute('set dmf_ [ls::create Dmf]')
955 _lib_id = self._get_library_id(dmf['dmf']['library'])
956 self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format(
959 for _param_key in dmf:
960 if _param_key == 'dmf':
962 _param_value = dmf[_param_key]
963 if isinstance(_param_value, dict):
964 # Configure complex parameter
965 _tcl_cmd = 'ls::config $dmf_'
966 for _sub_param_key in _param_value:
967 _sub_param_value = _param_value[_sub_param_key]
968 if isinstance(_sub_param_value, str):
969 _tcl_cmd += ' -{} "{}"'.format(_sub_param_key,
972 _tcl_cmd += ' -{} {}'.format(_sub_param_key,
975 self._tcl.execute(_tcl_cmd)
977 # Configure simple parameter
978 if isinstance(_param_value, str):
980 'ls::config $dmf_ -{} "{}"'.format(_param_key,
984 'ls::config $dmf_ -{} {}'.format(_param_key,
988 def configure_dmf(self, dmf):
989 # Use create to reconfigure and overwrite existing dmf
992 def delete_dmf(self, dmf):
993 raise NotImplementedError
996 # Call 'Validate' to set default values for missing parameters
997 res = self._tcl.execute('ls::perform Validate -Dmf $dmf_')
999 res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings')
1000 LOG.error("_save_dmf: %s", res)
1001 raise exceptions.LandslideTclException("_save_dmf: {}".format(res))
1003 res = self._tcl.execute('ls::save $dmf_ -overwrite')
1004 LOG.debug("_save_dmf: result (%s)", res)
1006 def _configure_report_options(self, options):
1007 for _option_key in options:
1008 _option_value = options[_option_key]
1009 if _option_key == 'format':
1011 if _option_value == 'CSV':
1014 'ls::config $test_.ReportOptions -Format {} '
1015 '-Ts -3 -Tc -3'.format(_format))
1018 'ls::config $test_.ReportOptions -{} {}'.format(
1022 def _configure_ts_group(self, ts_group, ts_group_index):
1024 _ts_id = int(self.resolve_test_server_name(ts_group['tsId']))
1026 raise RuntimeError('Test server name "{}" does not exist.'.format(
1028 if _ts_id not in self.ts_ids:
1030 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format(
1032 self.ts_ids.add(_ts_id)
1033 for _case in ts_group.get('testCases', []):
1034 self._configure_tc_type(_case, ts_group_index)
1036 self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress'))
1038 def _configure_tc_type(self, tc, ts_group_index):
1039 if tc['type'] not in self._tc_types:
1040 raise RuntimeError('Test type {} not supported.'.format(
1042 tc['type'] = tc['type'].replace('_', ' ')
1043 res = self._tcl.execute(
1044 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format(
1045 self._basic_library_id, tc['type']))
1046 if 'Invalid' in res:
1047 raise RuntimeError('Test type {} not found in "Basic" '
1048 'library.'.format(tc['type']))
1050 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format(
1052 self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format(
1053 self._basic_library_id, tc['name']))
1055 'ls::config $tc_ -Description "{}"'.format(tc['type']))
1057 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type']))
1060 'ls::config $tc_ -Linked {}'.format(tc['linked']))
1061 if 'AssociatedPhys' in tc:
1062 self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format(
1063 tc['AssociatedPhys']))
1064 if 'parameters' in tc:
1065 self._configure_parameters(tc['parameters'])
1067 def _configure_parameters(self, params):
1068 self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]')
1069 for _param_key in sorted(params):
1070 _param_value = params[_param_key]
1071 if isinstance(_param_value, dict):
1072 # Configure complex parameter
1073 if _param_value['class'] in self._class_param_config_handler:
1074 self._class_param_config_handler[_param_value['class']](
1078 # Configure simple parameter
1080 'ls::create {} -under $p_ -Value "{}"'.format(
1084 def _configure_array_param(self, name, params):
1085 self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name))
1086 for param in params['array']:
1088 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name,
1091 def _configure_test_node_param(self, name, params):
1092 _params = self.DEFAULT_TEST_NODE
1093 _params.update(params)
1095 # TCL command expects lower case 'true' or 'false'
1096 _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower()
1097 _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower()
1099 cmd = self.TEST_NODE_CMD.format(name, **_params)
1100 self._tcl.execute(cmd)
1102 def _configure_sut_param(self, name, params):
1104 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name,
1107 def _configure_dmf_param(self, name, params):
1108 self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name))
1110 for _flow_index, _flow in enumerate(params['mainflows']):
1111 _lib_id = self._get_library_id(_flow['library'])
1113 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format(
1117 if not params.get('instanceGroups'):
1120 _instance_group = params['instanceGroups'][_flow_index]
1122 # Traffic Mixer parameters handling
1123 for _key in ['mixType', 'rate']:
1124 if _key in _instance_group:
1126 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format(
1127 _flow_index, _key, _instance_group[_key]))
1129 # Assignments parameters handling
1130 for _row_id, _row in enumerate(_instance_group.get('rows', [])):
1132 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} '
1133 '-OverridePort {} -ClientPort {} -Context {} -Role {} '
1134 '-PreferredTransport {} -RatingGroup {} '
1135 '-ServiceID {}'.format(
1136 _flow_index, _row_id, _row['node'],
1137 _row['overridePort'], _row['clientPort'],
1138 _row['context'], _row['role'], _row['transport'],
1139 _row['ratingGroup'], _row['serviceId']))
1141 def _configure_reservation(self, reservation):
1142 _ts_id = self.resolve_test_server_name(reservation['tsId'])
1144 'set reservation_ [ls::create Reservation -under $test_]')
1146 'ls::config $reservation_ -TsIndex {} -TsId {} '
1147 '-TsName "{}"'.format(reservation['tsIndex'],
1149 reservation['tsName']))
1150 for _subnet in reservation['phySubnets']:
1152 'set physubnet_ [ls::create PhySubnet -under $reservation_]')
1154 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" '
1155 '-NumIps {}'.format(_subnet['name'], _subnet['base'],
1156 _subnet['mask'], _subnet['numIps']))
1158 def _configure_preresolved_arp(self, pre_resolved_arp):
1159 if not pre_resolved_arp: # Pre-resolved ARP configuration not found
1161 for _entry in pre_resolved_arp:
1162 # TsGroup handle name should correspond in _configure_ts_group()
1164 'ls::create PreResolvedArpAddress -under $tss_ '
1165 '-StartingAddress "{StartingAddress}" '
1166 '-NumNodes {NumNodes}'.format(**_entry))
1168 def delete_test_session(self, test_session):
1169 raise NotImplementedError
1171 def _save_test_session(self):
1172 # Call 'Validate' to set default values for missing parameters
1173 res = self._tcl.execute('ls::perform Validate -TestSession $test_')
1174 if res == 'Invalid':
1175 res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings')
1176 raise exceptions.LandslideTclException(
1177 "Test session validation failed. Server response: {}".format(
1180 self._tcl.execute('ls::save $test_ -overwrite')
1181 LOG.debug("Test session saved successfully.")
1183 def _get_library_id(self, library):
1184 _library_id = self._tcl.execute(
1185 "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format(
1193 _library_id = self._tcl.execute(
1194 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
1199 LOG.error("_get_library_id: library='%s' not found.", library)
1200 raise exceptions.LandslideTclException(
1201 "_get_library_id: library='{}' not found.".format(
1206 def resolve_test_server_name(self, ts_name):
1207 return self._tcl.execute("ls::query TsId {}".format(ts_name))
1210 class LsTclHandler(object):
1211 """Landslide TCL Handler class"""
1214 JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386')
1218 self._ls = LsApi(jre_path=self.JRE_PATH)
1220 "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format(
1223 def execute(self, command):
1224 res = self._ls.tcl(command)
1225 self.tcl_cmds[command] = res