1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from yardstick.common import exceptions
22 from yardstick.common import utils as common_utils
23 from yardstick.common import yaml_loader
24 from yardstick.network_services import utils as net_serv_utils
25 from yardstick.network_services.vnf_generic.vnf import sample_vnf
28 from lsapi import LsApi
30 LsApi = common_utils.ErrorClass
32 LOG = logging.getLogger(__name__)
35 class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
36 APP_NAME = 'LandslideTG'
38 def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
39 resource_helper_type=None):
40 if resource_helper_type is None:
41 resource_helper_type = LandslideResourceHelper
42 super(LandslideTrafficGen, self).__init__(name, vnfd, task_id,
43 setup_env_helper_type,
46 self.bin_path = net_serv_utils.get_nsb_option('bin_path')
48 self.runs_traffic = True
49 self.traffic_finished = False
50 self.session_profile = None
52 def listen_traffic(self, traffic_profile):
56 self.resource_helper.disconnect()
58 def instantiate(self, scenario_cfg, context_cfg):
59 super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
60 self.resource_helper.connect()
63 test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
64 self.resource_helper.create_test_servers(test_servers)
67 [self.resource_helper.create_suts(x['suts']) for x in
68 self.vnfd_helper['config']]
70 # Fill in test session based on session profile and test case options
71 self._load_session_profile()
73 def run_traffic(self, traffic_profile):
74 self.resource_helper.abort_running_tests()
75 # Update DMF profile with related test case options
76 traffic_profile.update_dmf(self.scenario_helper.all_options)
77 # Create DMF in test user library
78 self.resource_helper.create_dmf(traffic_profile.dmf_config)
79 # Create/update test session in test user library
80 self.resource_helper.create_test_session(self.session_profile)
82 self.resource_helper.create_running_tests(self.session_profile['name'])
84 def collect_kpi(self):
85 return self.resource_helper.collect_kpi()
87 def wait_for_instantiate(self):
91 def _update_session_suts(suts, testcase):
92 """ Create SUT entry. Update related EPC block in session profile. """
94 # Update session profile EPC element with SUT info from pod file
95 tc_role = testcase['parameters'].get(sut['role'])
98 if tc_role['class'] == 'Sut':
99 _param['name'] = sut['name']
100 elif tc_role['class'] == 'TestNode':
101 _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
102 if x in sut and sut[x]})
103 testcase['parameters'][sut['role']].update(_param)
105 LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
108 def _update_session_test_servers(self, test_server, _tsgroup_index):
109 """ Update tsId, reservations, pre-resolved ARP in session profile """
110 # Update test server name
111 test_groups = self.session_profile['tsGroups']
112 test_groups[_tsgroup_index]['tsId'] = test_server['name']
114 # Update preResolvedArpAddress
115 arp_key = 'preResolvedArpAddress'
116 _preresolved_arp = test_server.get(arp_key) # list of dicts
118 test_groups[_tsgroup_index][arp_key] = _preresolved_arp
120 # Update reservations
121 if 'phySubnets' in test_server:
122 reservation = {'tsId': test_server['name'],
123 'tsIndex': _tsgroup_index,
124 'tsName': test_server['name'],
125 'phySubnets': test_server['phySubnets']}
126 if 'reservations' in self.session_profile:
127 self.session_profile['reservations'].append(reservation)
129 self.session_profile['reservePorts'] = 'true'
130 self.session_profile['reservations'] = [reservation]
133 def _update_session_tc_params(tc_options, testcase):
134 for _param_key in tc_options:
135 if _param_key == 'AssociatedPhys':
136 testcase[_param_key] = tc_options[_param_key]
138 testcase['parameters'][_param_key] = tc_options[_param_key]
141 def _load_session_profile(self):
143 with common_utils.open_relative_file(
144 self.scenario_helper.scenario_cfg['session_profile'],
145 self.scenario_helper.task_path) as stream:
146 self.session_profile = yaml_loader.yaml_load(stream)
148 # Raise exception if number of entries differs in following files,
149 _config_files = ['pod file', 'session_profile file', 'test_case file']
150 # Count testcases number in all tsGroups of session profile
151 session_tests_num = [xx for x in self.session_profile['tsGroups']
152 for xx in x['testCases']]
153 # Create a set containing number of list elements in each structure
154 _config_files_blocks_num = [
156 (self.vnfd_helper['config'], # test_servers and suts info
158 self.scenario_helper.all_options['test_cases'])] # test case file
160 if len(set(_config_files_blocks_num)) != 1:
161 raise RuntimeError('Unequal number of elements. {}'.format(
162 dict(six.moves.zip_longest(_config_files,
163 _config_files_blocks_num))))
169 # Iterate over data structures to overwrite session profile defaults
170 # _config: single list element holding test servers and SUTs info
171 # _tc_options: single test case parameters
172 for _config, tc_options in zip(
173 self.vnfd_helper['config'], # test servers and SUTS
174 self.scenario_helper.all_options['test_cases']): # testcase
176 _ts_config = _config['test_server']
178 # Calculate test group/test case indexes based on test server name
179 if _ts_config['name'] in ts_names:
186 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
189 if _testcase['type'] != _ts_config['role']:
191 'Test type mismatch in TC#{} of test server {}'.format(
192 _testcase_idx, _ts_config['name']))
194 # Fill session profile with test servers parameters
195 if _ts_config['name'] not in ts_names:
196 self._update_session_test_servers(_ts_config, _tsgroup_idx)
197 ts_names.add(_ts_config['name'])
199 # Fill session profile with suts parameters
200 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
201 _testcase_idx].update(
202 self._update_session_suts(_config['suts'], _testcase))
204 # Update test case parameters
205 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
206 _testcase_idx].update(
207 self._update_session_tc_params(tc_options, _testcase))
210 class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
211 """Landslide TG helper class"""
213 REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409}
214 REST_API_CODES = {'NOT MODIFIED': 500810}
216 def __init__(self, setup_helper):
217 super(LandslideResourceHelper, self).__init__(setup_helper)
219 self.vnfd_helper = setup_helper.vnfd_helper
220 self.scenario_helper = setup_helper.scenario_helper
222 # TAS Manager config initialization
226 self.license_data = {}
228 # TCL session initialization
229 self._tcl = LandslideTclClient(LsTclHandler(), self)
231 self.session = requests.Session()
232 self.running_tests_uri = 'runningTests'
233 self.test_session_uri = 'testSessions'
234 self.test_serv_uri = 'testServers'
235 self.suts_uri = 'suts'
236 self.users_uri = 'users'
237 self.user_lib_uri = None
240 def abort_running_tests(self, timeout=60, delay=5):
241 """ Abort running test sessions, if any """
242 _start_time = time.time()
243 while time.time() < _start_time + timeout:
244 run_tests_states = {x['id']: x['testStateOrStep']
245 for x in self.get_running_tests()}
246 if not set(run_tests_states.values()).difference(
247 {'COMPLETE', 'COMPLETE_ERROR'}):
250 [self.stop_running_tests(running_test_id=_id, force=True)
251 for _id, _state in run_tests_states.items()
252 if 'COMPLETE' not in _state]
256 'Some test runs not stopped during {} seconds'.format(timeout))
258 def _build_url(self, resource, action=None):
261 :param resource: REST API resource name
263 :param action: actions name and value
264 :type action: dict('name': <str>, 'value': <str>)
265 :returns str: REST API resource name with optional action info
267 # Action is optional and accepted only in presence of resource param
268 if action and not resource:
269 raise ValueError("Resource name not provided")
270 # Concatenate actions
271 _action = ''.join(['?{}={}'.format(k, v) for k, v in
272 action.items()]) if action else ''
274 return ''.join([self._url, resource, _action])
276 def get_response_params(self, method, resource, params=None):
277 """ Retrieve params from JSON response of specific resource URL
279 :param method: one of supported REST API methods
281 :param resource: URI, requested resource name
283 :param params: attributes to be found in JSON response
284 :type params: list(str)
287 params = params if params else []
288 response = self.exec_rest_request(method, resource)
289 # Get substring between last slash sign and question mark (if any)
290 url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0]
291 _response_json = response.json()
292 # Expect dict(), if URL last part and top dict key don't match
293 # Else, if they match, expect list()
294 k, v = list(_response_json.items())[0]
295 if k != url_last_part:
296 v = [v] # v: list(dict(str: str))
297 # Extract params, or whole list of dicts (without top level key)
299 _res.append({param: x[param] for param in params} if params else x)
302 def _create_user(self, auth, level=1):
305 :param auth: data to create user account on REST server
307 :param level: Landslide user permissions level
309 :returns int: user id
311 # Set expiration date in two years since account creation date
312 _exp_date = time.strftime(
313 '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2))
314 _username = auth['user']
315 _fields = {"contactInformation": "", "expiresOn": _exp_date,
316 "fullName": "Test User",
317 "isActive": "true", "level": level,
318 "password": auth['password'],
319 "username": _username}
320 _response = self.exec_rest_request('post', self.users_uri,
321 json_data=_fields, raise_exc=False)
322 _resp_json = _response.json()
323 if _response.status_code == self.REST_STATUS_CODES['CREATED']:
325 _id = _resp_json['id']
326 LOG.info("New user created: username='%s', id='%s'", _username,
328 elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']:
329 # User already exists
330 LOG.info("Account '%s' already exists.", _username)
332 _id = self._modify_user(_username, {"isActive": "true"})['id']
334 raise exceptions.RestApiError(
335 'Error during new user "{}" creation'.format(_username))
338 def _modify_user(self, username, fields):
339 """ Modify information about existing user
341 :param username: user name of account to be modified
343 :param fields: data to modify user account on REST server
345 :returns dict: user info
347 _response = self.exec_rest_request('post', self.users_uri,
348 action={'username': username},
349 json_data=fields, raise_exc=False)
350 if _response.status_code == self.REST_STATUS_CODES['OK']:
351 _response = _response.json()
353 raise exceptions.RestApiError(
354 'Error during user "{}" data update: {}'.format(
356 _response.status_code))
357 LOG.info("User account '%s' modified: '%s'", username, _response)
360 def _delete_user(self, username):
361 """ Delete user account
363 :param username: username field
365 :returns bool: True if succeeded
367 self.exec_rest_request('delete', self.users_uri,
368 action={'username': username})
370 def _get_users(self, username=None):
371 """ Get user records from REST server
373 :param username: username field
374 :type username: None|str
375 :returns list(dict): empty list, or user record, or list of all users
377 _response = self.get_response_params('get', self.users_uri)
378 _res = [u for u in _response if
379 u['username'] == username] if username else _response
382 def exec_rest_request(self, method, resource, action=None, json_data=None,
383 logs=True, raise_exc=True):
384 """ Execute REST API request, return response object
386 :param method: one of supported requests ('post', 'get', 'delete')
388 :param resource: URL of resource
390 :param action: data used to provide URI located after question mark
392 :param json_data: mandatory only for 'post' method
393 :type json_data: dict
394 :param logs: debug logs display flag
395 :type raise_exc: bool
396 :param raise_exc: if True, raise exception on REST API call error
397 :returns requests.Response(): REST API call response object
399 json_data = json_data if json_data else {}
400 action = action if action else {}
401 _method = method.upper()
402 method = method.lower()
403 if method not in ('post', 'get', 'delete'):
404 raise ValueError("Method '{}' not supported".format(_method))
406 if method == 'post' and not action:
407 if not (json_data and isinstance(json_data, collections.Mapping)):
409 'JSON data missing in {} request'.format(_method))
411 r = getattr(self.session, method)(self._build_url(resource, action),
413 if raise_exc and not r.ok:
414 msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format(
415 method, self._build_url(resource, action), r.reason)
416 raise exceptions.RestApiError(msg)
419 LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method,
421 LOG.debug("Response: %s", r.json())
425 """Connect to RESTful server using test user account"""
426 tas_info = self.vnfd_helper['mgmt-interface']
427 # Supported REST Server ports: HTTP - 8080, HTTPS - 8181
428 _port = '8080' if tas_info['proto'] == 'http' else '8181'
429 tas_info.update({'port': _port})
430 self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info)
431 self.session.headers.update({'Accept': 'application/json',
432 'Content-type': 'application/json'})
433 # Login with super user to create test user
434 self.session.auth = (
435 tas_info['super-user'], tas_info['super-user-password'])
436 LOG.info("Connect using superuser: server='%s'", self._url)
437 auth = {x: tas_info[x] for x in ('user', 'password')}
438 self._user_id = self._create_user(auth)
439 # Login with test user
440 self.session.auth = auth['user'], auth['password']
442 self.exec_rest_request('get', '')
444 self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri)
445 LOG.info("Login with test user: server='%s'", self._url)
446 # Read existing license
447 self.license_data['lic_id'] = tas_info['license']
450 self._tcl.connect(tas_info['ip'], *self.session.auth)
454 def disconnect(self):
456 self._tcl.disconnect()
459 self._terminated.value = 1
461 def create_dmf(self, dmf):
462 if isinstance(dmf, list):
464 self._tcl.create_dmf(_dmf)
466 self._tcl.create_dmf(dmf)
468 def delete_dmf(self, dmf):
469 if isinstance(dmf, list):
471 self._tcl.delete_dmf(_dmf)
473 self._tcl.delete_dmf(dmf)
475 def create_suts(self, suts):
476 # Keep only supported keys in suts object
478 sut_entry = {k: v for k, v in _sut.items()
479 if k not in {'phy', 'nextHop', 'role'}}
480 _response = self.exec_rest_request(
481 'post', self.suts_uri, json_data=sut_entry,
482 logs=False, raise_exc=False)
483 if _response.status_code != self.REST_STATUS_CODES['CREATED']:
484 LOG.info(_response.reason) # Failed to create
485 _name = sut_entry.pop('name')
486 # Modify existing SUT
487 self.configure_sut(sut_name=_name, json_data=sut_entry)
489 LOG.info("SUT created: %s", sut_entry)
491 def get_suts(self, suts_id=None):
493 _suts = self.exec_rest_request(
494 'get', '{}/{}'.format(self.suts_uri, suts_id)).json()
496 _suts = self.get_response_params('get', self.suts_uri)
500 def configure_sut(self, sut_name, json_data):
501 """ Modify information of specific SUTs
503 :param sut_name: name of existing SUT
505 :param json_data: SUT settings
506 :type json_data: dict()
508 LOG.info("Modifying SUT information...")
509 _response = self.exec_rest_request('post',
511 action={'name': sut_name},
514 if _response.status_code not in {self.REST_STATUS_CODES[x] for x in
515 {'OK', 'NO CHANGE'}}:
516 raise exceptions.RestApiError(_response.reason)
518 LOG.info("Modified SUT: %s", sut_name)
520 def delete_suts(self, suts_ids=None):
522 _curr_suts = self.get_response_params('get', self.suts_uri)
523 suts_ids = [x['id'] for x in _curr_suts]
524 LOG.info("Deleting SUTs with following IDs: %s", suts_ids)
526 self.exec_rest_request('delete',
527 '{}/{}'.format(self.suts_uri, _id))
528 LOG.info("\tDone for SUT id: %s", _id)
530 def _check_test_servers_state(self, test_servers_ids=None, delay=10,
532 LOG.info("Waiting for related test servers state change to READY...")
533 # Wait on state change
534 _start_time = time.time()
535 while time.time() - _start_time < timeout:
536 ts_ids_not_ready = {x['id'] for x in
537 self.get_test_servers(test_servers_ids)
538 if x['state'] != 'READY'}
539 if ts_ids_not_ready == set():
544 'Test servers not in READY state after {} seconds.'.format(
547 def create_test_servers(self, test_servers):
548 """ Create test servers
550 :param test_servers: input data for test servers creation
551 mandatory fields: managementIp
552 optional fields: name
553 :type test_servers: list(dict)
556 for _ts in test_servers:
557 _msg = 'Created test server "%(name)s"'
558 _ts_ids.append(self._tcl.create_test_server(_ts))
559 if _ts.get('thread_model'):
560 _msg += ' in mode: "%(thread_model)s"'
563 self._check_test_servers_state(_ts_ids)
565 def get_test_servers(self, test_server_ids=None):
566 if not test_server_ids: # Get all test servers info
567 _test_servers = self.exec_rest_request(
568 'get', self.test_serv_uri).json()[self.test_serv_uri]
569 LOG.info("Current test servers configuration: %s", _test_servers)
573 for _id in test_server_ids:
574 _test_servers.append(self.exec_rest_request(
575 'get', '{}/{}'.format(self.test_serv_uri, _id)).json())
576 LOG.info("Current test servers configuration: %s", _test_servers)
579 def configure_test_servers(self, action, json_data=None,
580 test_server_ids=None):
581 if not test_server_ids:
582 test_server_ids = [x['id'] for x in self.get_test_servers()]
583 elif isinstance(test_server_ids, int):
584 test_server_ids = [test_server_ids]
585 for _id in test_server_ids:
586 self.exec_rest_request('post',
587 '{}/{}'.format(self.test_serv_uri, _id),
588 action=action, json_data=json_data)
589 LOG.info("Test server (id: %s) configuration done: %s", _id,
591 return test_server_ids
593 def delete_test_servers(self, test_servers_ids=None):
594 # Delete test servers
595 for _ts in self.get_test_servers(test_servers_ids):
596 self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri,
598 LOG.info("Deleted test server: %s", _ts['name'])
600 def create_test_session(self, test_session):
601 # Use tcl client to create session
602 test_session['library'] = self._user_id
603 LOG.debug("Creating session='%s'", test_session['name'])
604 self._tcl.create_test_session(test_session)
606 def get_test_session(self, test_session_name=None):
607 if test_session_name:
608 uri = 'libraries/{}/{}/{}'.format(self._user_id,
609 self.test_session_uri,
612 uri = self.user_lib_uri.format(self._user_id)
613 _test_sessions = self.exec_rest_request('get', uri).json()
614 return _test_sessions
616 def configure_test_session(self, template_name, test_session):
617 # Override specified test session parameters
618 LOG.info('Update test session parameters: %s', test_session['name'])
619 test_session.update({'library': self._user_id})
620 return self.exec_rest_request(
622 action={'action': 'overrideAndSaveAs'},
623 json_data=test_session,
624 resource='{}/{}'.format(self.user_lib_uri.format(self._user_id),
627 def delete_test_session(self, test_session):
628 return self.exec_rest_request('delete', '{}/{}'.format(
629 self.user_lib_uri.format(self._user_id), test_session))
631 def create_running_tests(self, test_session_name):
632 r = self.exec_rest_request('post',
633 self.running_tests_uri,
634 json_data={'library': self._user_id,
635 'name': test_session_name})
636 if r.status_code != self.REST_STATUS_CODES['CREATED']:
637 raise exceptions.RestApiError('Failed to start test session.')
638 self.run_id = r.json()['id']
640 def get_running_tests(self, running_test_id=None):
641 """Get JSON structure of specified running test entity
643 :param running_test_id: ID of created running test entity
644 :type running_test_id: int
645 :returns list: running tests entity
647 if not running_test_id:
649 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
650 _res = self.exec_rest_request('get', _res_name, logs=False).json()
651 # If no run_id specified, skip top level key in response dict.
652 # Else return JSON as list
653 return _res.get('runningTests', [_res])
655 def delete_running_tests(self, running_test_id=None):
656 if not running_test_id:
658 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
659 self.get_response_params('delete', _res_name)
660 LOG.info("Deleted running test with id: %s", running_test_id)
662 def _running_tests_action(self, running_test_id, action, json_data=None):
666 # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc'
667 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
668 self.exec_rest_request('post', _res_name, {'action': action},
670 LOG.debug("Executed action: '%s' on running test id: %s", action,
673 def stop_running_tests(self, running_test_id, json_data=None, force=False):
674 _action = 'abort' if force else 'stop'
675 self._running_tests_action(running_test_id, _action,
677 LOG.info('Performed action: "%s" to test run with id: %s', _action,
680 def check_running_test_state(self, run_id):
681 r = self.exec_rest_request('get',
682 '{}/{}'.format(self.running_tests_uri,
684 return r.json().get("testStateOrStep")
686 def get_running_tests_results(self, run_id):
687 _res = self.exec_rest_request(
689 '{}/{}/{}'.format(self.running_tests_uri,
691 'measurements')).json()
694 def _write_results(self, results):
695 # Avoid None value at test session start
696 _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0
698 _res_tabs = results.get('tabs')
699 # Avoid parsing 'tab' dict key initially (missing or empty)
703 # Flatten nested dict holding Landslide KPIs of current test run
705 for _tab, _kpis in six.iteritems(_res_tabs):
706 for _kpi, _value in six.iteritems(_kpis):
707 # Combine table name and KPI name using delimiter "::"
708 _key = '::'.join([_tab, _kpi])
710 # Cast value from str to float
711 # Remove comma and/or measure units, e.g. "us"
712 flat_kpis_dict[_key] = float(
713 _value.split(' ')[0].replace(',', ''))
714 except ValueError: # E.g. if KPI represents datetime
716 LOG.info("Polling test results of test run id: %s. Elapsed time: %s "
717 "seconds", self.run_id, _elapsed_time)
718 return flat_kpis_dict
720 def collect_kpi(self):
721 if 'COMPLETE' in self.check_running_test_state(self.run_id):
722 self._result.update({'done': True})
724 _res = self.get_running_tests_results(self.run_id)
725 _kpis = self._write_results(_res)
727 _kpis.update({'run_id': int(self.run_id)})
728 _kpis.update({'iteration': _res['iteration']})
729 self._result.update(_kpis)
733 class LandslideTclClient(object):
734 """Landslide TG TCL client class"""
736 DEFAULT_TEST_NODE = {
737 'ethStatsEnabled': True,
738 'forcedEthInterface': '',
744 'numLinksOrNodes': 1,
747 'uniqueVlanAddr': False,
750 'vlanUserPriority': 0,
755 'ls::create -TestNode-{} -under $p_ -Type "eth"' \
756 ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \
757 ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \
758 ' -ForcedEthInterface "{forcedEthInterface}"' \
759 ' -EthStatsEnabled {ethStatsEnabled}' \
760 ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \
761 ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \
764 def __init__(self, tcl_handler, ts_context):
765 self.tcl_server_ip = None
767 self._library_id = None
768 self._basic_library_id = None
769 self._tcl = tcl_handler
770 self._ts_context = ts_context
773 # Test types names expected in session profile, test case and pod files
774 self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node",
777 self._class_param_config_handler = {
778 "Array": self._configure_array_param,
779 "TestNode": self._configure_test_node_param,
780 "Sut": self._configure_sut_param,
781 "Dmf": self._configure_dmf_param
784 def connect(self, tcl_server_ip, username, password):
785 """ Connect to TCL server with username and password
787 :param tcl_server_ip: TCL server IP address
788 :type tcl_server_ip: str
789 :param username: existing username on TCL server
791 :param password: password related to username on TCL server
794 LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username)
795 res = self._tcl.execute(
796 "ls::login {} {} {}".format(tcl_server_ip, username, password))
797 if 'java0x' not in res: # handle assignment reflects login success
798 raise exceptions.LandslideTclException(
799 "connect: login failed ='{}'.".format(res))
800 self._library_id = self._tcl.execute(
801 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
803 self._basic_library_id = self._get_library_id('Basic')
804 self.tcl_server_ip = tcl_server_ip
805 self._user = username
806 LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user,
808 self._basic_library_id)
810 def disconnect(self):
811 """ Disconnect from TCL server. Drop TCL connection configuration """
812 LOG.info("disconnect: server='%s' user='%s'",
813 self.tcl_server_ip, self._user)
814 self._tcl.execute("ls::logout")
815 self.tcl_server_ip = None
817 self._library_id = None
818 self._basic_library_id = None
820 def _add_test_server(self, name, ip):
822 # Check if test server exists with name equal to _ts_name
823 ts_id = int(self.resolve_test_server_name(name))
825 # Such test server does not exist. Attempt to create it
826 ts_id = self._tcl.execute(
827 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip))
831 # Failed to create test server, e.g. limit reached
833 'Failed to create test server: "{}". {}'.format(name,
837 def _update_license(self, name):
838 """ Setup/update test server license
840 :param name: test server name
843 # Retrieve current TsInfo configuration, result stored in handle "ts"
845 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name))
847 # Set license ID, if it differs from current one, update test server
848 _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense')
849 if _curr_lic_id != self._ts_context.license_data['lic_id']:
850 self._tcl.execute('ls::config $ts -RequestedLicense {}'.format(
851 self._ts_context.license_data['lic_id']))
852 self._tcl.execute('ls::perform ModifyTs $ts')
854 def _set_thread_model(self, name, thread_model):
855 # Retrieve test server configuration, store it in handle "tsc"
856 _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][
859 'set tsc [ls::perform RetrieveTsConfiguration '
860 '-name "{}" {}]'.format(name, _cfguser_password))
861 # Configure ThreadModel, if it differs from current one
862 thread_model_map = {'Legacy': 'V0',
864 'Fireball': 'V1_FB3'}
865 _model = thread_model_map[thread_model]
866 _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel')
867 if _curr_model != _model:
869 'ls::config $tsc -ThreadModel "{}"'.format(_model))
871 'ls::perform ApplyTsConfiguration $tsc {}'.format(
874 def create_test_server(self, test_server):
875 _ts_thread_model = test_server.get('thread_model')
876 _ts_name = test_server['name']
878 ts_id = self._add_test_server(_ts_name, test_server['ip'])
880 self._update_license(_ts_name)
882 # Skip below code modifying thread_model if it is not defined
884 self._set_thread_model(_ts_name, _ts_thread_model)
888 def create_test_session(self, test_session):
889 """ Create, configure and save Landslide test session object.
891 :param test_session: Landslide TestSession object
892 :type test_session: dict
894 LOG.info("create_test_session: name='%s'", test_session['name'])
895 self._tcl.execute('set test_ [ls::create TestSession]')
896 self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format(
897 self._library_id, test_session['name']))
898 self._tcl.execute('ls::config $test_ -Description "{}"'.format(
899 test_session['description']))
900 if 'keywords' in test_session:
901 self._tcl.execute('ls::config $test_ -Keywords "{}"'.format(
902 test_session['keywords']))
903 if 'duration' in test_session:
904 self._tcl.execute('ls::config $test_ -Duration "{}"'.format(
905 test_session['duration']))
906 if 'iterations' in test_session:
907 self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
908 test_session['iterations']))
909 if 'reservePorts' in test_session:
910 if test_session['reservePorts'] == 'true':
911 self._tcl.execute('ls::config $test_ -Reserve Ports')
913 if 'reservations' in test_session:
914 for _reservation in test_session['reservations']:
915 self._configure_reservation(_reservation)
917 if 'reportOptions' in test_session:
918 self._configure_report_options(test_session['reportOptions'])
920 for _index, _group in enumerate(test_session['tsGroups']):
921 self._configure_ts_group(_group, _index)
923 self._save_test_session()
925 def create_dmf(self, dmf):
926 """ Create, configure and save Landslide Data Message Flow object.
928 :param dmf: Landslide Data Message Flow object
931 self._tcl.execute('set dmf_ [ls::create Dmf]')
932 _lib_id = self._get_library_id(dmf['dmf']['library'])
933 self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format(
936 for _param_key in dmf:
937 if _param_key == 'dmf':
939 _param_value = dmf[_param_key]
940 if isinstance(_param_value, dict):
941 # Configure complex parameter
942 _tcl_cmd = 'ls::config $dmf_'
943 for _sub_param_key in _param_value:
944 _sub_param_value = _param_value[_sub_param_key]
945 if isinstance(_sub_param_value, str):
946 _tcl_cmd += ' -{} "{}"'.format(_sub_param_key,
949 _tcl_cmd += ' -{} {}'.format(_sub_param_key,
952 self._tcl.execute(_tcl_cmd)
954 # Configure simple parameter
955 if isinstance(_param_value, str):
957 'ls::config $dmf_ -{} "{}"'.format(_param_key,
961 'ls::config $dmf_ -{} {}'.format(_param_key,
965 def configure_dmf(self, dmf):
966 # Use create to reconfigure and overwrite existing dmf
969 def delete_dmf(self, dmf):
970 raise NotImplementedError
973 # Call 'Validate' to set default values for missing parameters
974 res = self._tcl.execute('ls::perform Validate -Dmf $dmf_')
976 res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings')
977 LOG.error("_save_dmf: %s", res)
978 raise exceptions.LandslideTclException("_save_dmf: {}".format(res))
980 res = self._tcl.execute('ls::save $dmf_ -overwrite')
981 LOG.debug("_save_dmf: result (%s)", res)
983 def _configure_report_options(self, options):
984 for _option_key in options:
985 _option_value = options[_option_key]
986 if _option_key == 'format':
988 if _option_value == 'CSV':
991 'ls::config $test_.ReportOptions -Format {} '
992 '-Ts -3 -Tc -3'.format(_format))
995 'ls::config $test_.ReportOptions -{} {}'.format(
999 def _configure_ts_group(self, ts_group, ts_group_index):
1001 _ts_id = int(self.resolve_test_server_name(ts_group['tsId']))
1003 raise RuntimeError('Test server name "{}" does not exist.'.format(
1005 if _ts_id not in self.ts_ids:
1007 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format(
1009 self.ts_ids.add(_ts_id)
1010 for _case in ts_group.get('testCases', []):
1011 self._configure_tc_type(_case, ts_group_index)
1013 self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress'))
1015 def _configure_tc_type(self, tc, ts_group_index):
1016 if tc['type'] not in self._tc_types:
1017 raise RuntimeError('Test type {} not supported.'.format(
1019 tc['type'] = tc['type'].replace('_', ' ')
1020 res = self._tcl.execute(
1021 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format(
1022 self._basic_library_id, tc['type']))
1023 if 'Invalid' in res:
1024 raise RuntimeError('Test type {} not found in "Basic" '
1025 'library.'.format(tc['type']))
1027 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format(
1029 self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format(
1030 self._basic_library_id, tc['name']))
1032 'ls::config $tc_ -Description "{}"'.format(tc['type']))
1034 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type']))
1037 'ls::config $tc_ -Linked {}'.format(tc['linked']))
1038 if 'AssociatedPhys' in tc:
1039 self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format(
1040 tc['AssociatedPhys']))
1041 if 'parameters' in tc:
1042 self._configure_parameters(tc['parameters'])
1044 def _configure_parameters(self, params):
1045 self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]')
1046 for _param_key in sorted(params):
1047 _param_value = params[_param_key]
1048 if isinstance(_param_value, dict):
1049 # Configure complex parameter
1050 if _param_value['class'] in self._class_param_config_handler:
1051 self._class_param_config_handler[_param_value['class']](
1055 # Configure simple parameter
1057 'ls::create {} -under $p_ -Value "{}"'.format(
1061 def _configure_array_param(self, name, params):
1062 self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name))
1063 for param in params['array']:
1065 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name,
1068 def _configure_test_node_param(self, name, params):
1069 _params = self.DEFAULT_TEST_NODE
1070 _params.update(params)
1072 # TCL command expects lower case 'true' or 'false'
1073 _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower()
1074 _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower()
1076 cmd = self.TEST_NODE_CMD.format(name, **_params)
1077 self._tcl.execute(cmd)
1079 def _configure_sut_param(self, name, params):
1081 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name,
1084 def _configure_dmf_param(self, name, params):
1085 self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name))
1087 for _flow_index, _flow in enumerate(params['mainflows']):
1088 _lib_id = self._get_library_id(_flow['library'])
1090 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format(
1094 if not params.get('instanceGroups'):
1097 _instance_group = params['instanceGroups'][_flow_index]
1099 # Traffic Mixer parameters handling
1100 for _key in ['mixType', 'rate']:
1101 if _key in _instance_group:
1103 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format(
1104 _flow_index, _key, _instance_group[_key]))
1106 # Assignments parameters handling
1107 for _row_id, _row in enumerate(_instance_group.get('rows', [])):
1109 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} '
1110 '-OverridePort {} -ClientPort {} -Context {} -Role {} '
1111 '-PreferredTransport {} -RatingGroup {} '
1112 '-ServiceID {}'.format(
1113 _flow_index, _row_id, _row['node'],
1114 _row['overridePort'], _row['clientPort'],
1115 _row['context'], _row['role'], _row['transport'],
1116 _row['ratingGroup'], _row['serviceId']))
1118 def _configure_reservation(self, reservation):
1119 _ts_id = self.resolve_test_server_name(reservation['tsId'])
1121 'set reservation_ [ls::create Reservation -under $test_]')
1123 'ls::config $reservation_ -TsIndex {} -TsId {} '
1124 '-TsName "{}"'.format(reservation['tsIndex'],
1126 reservation['tsName']))
1127 for _subnet in reservation['phySubnets']:
1129 'set physubnet_ [ls::create PhySubnet -under $reservation_]')
1131 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" '
1132 '-NumIps {}'.format(_subnet['name'], _subnet['base'],
1133 _subnet['mask'], _subnet['numIps']))
1135 def _configure_preresolved_arp(self, pre_resolved_arp):
1136 if not pre_resolved_arp: # Pre-resolved ARP configuration not found
1138 for _entry in pre_resolved_arp:
1139 # TsGroup handle name should correspond in _configure_ts_group()
1141 'ls::create PreResolvedArpAddress -under $tss_ '
1142 '-StartingAddress "{StartingAddress}" '
1143 '-NumNodes {NumNodes}'.format(**_entry))
1145 def delete_test_session(self, test_session):
1146 raise NotImplementedError
1148 def _save_test_session(self):
1149 # Call 'Validate' to set default values for missing parameters
1150 res = self._tcl.execute('ls::perform Validate -TestSession $test_')
1151 if res == 'Invalid':
1152 res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings')
1153 raise exceptions.LandslideTclException(
1154 "Test session validation failed. Server response: {}".format(
1157 self._tcl.execute('ls::save $test_ -overwrite')
1158 LOG.debug("Test session saved successfully.")
1160 def _get_library_id(self, library):
1161 _library_id = self._tcl.execute(
1162 "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format(
1170 _library_id = self._tcl.execute(
1171 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
1176 LOG.error("_get_library_id: library='%s' not found.", library)
1177 raise exceptions.LandslideTclException(
1178 "_get_library_id: library='{}' not found.".format(
1183 def resolve_test_server_name(self, ts_name):
1184 return self._tcl.execute("ls::query TsId {}".format(ts_name))
1187 class LsTclHandler(object):
1188 """Landslide TCL Handler class"""
1191 JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386')
1195 self._ls = LsApi(jre_path=self.JRE_PATH)
1197 "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format(
1200 def execute(self, command):
1201 res = self._ls.tcl(command)
1202 self.tcl_cmds[command] = res