1 # Copyright (c) 2018 Intel Corporation
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
21 from yardstick.common import exceptions
22 from yardstick.common import utils as common_utils
23 from yardstick.common import yaml_loader
24 from yardstick.network_services import utils as net_serv_utils
25 from yardstick.network_services.vnf_generic.vnf import sample_vnf
28 from lsapi import LsApi
30 LsApi = common_utils.ErrorClass
32 LOG = logging.getLogger(__name__)
35 class LandslideTrafficGen(sample_vnf.SampleVNFTrafficGen):
36 APP_NAME = 'LandslideTG'
38 def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
39 resource_helper_type=None):
40 if resource_helper_type is None:
41 resource_helper_type = LandslideResourceHelper
42 super(LandslideTrafficGen, self).__init__(name, vnfd, task_id,
43 setup_env_helper_type,
46 self.bin_path = net_serv_utils.get_nsb_option('bin_path')
48 self.runs_traffic = True
49 self.traffic_finished = False
50 self.session_profile = None
52 def listen_traffic(self, traffic_profile):
56 self.resource_helper.disconnect()
58 def instantiate(self, scenario_cfg, context_cfg):
59 super(LandslideTrafficGen, self).instantiate(scenario_cfg, context_cfg)
60 self.resource_helper.connect()
63 test_servers = [x['test_server'] for x in self.vnfd_helper['config']]
64 self.resource_helper.create_test_servers(test_servers)
67 [self.resource_helper.create_suts(x['suts']) for x in
68 self.vnfd_helper['config']]
70 # Fill in test session based on session profile and test case options
71 self._load_session_profile()
73 def run_traffic(self, traffic_profile):
74 self.resource_helper.abort_running_tests()
75 # Update DMF profile with related test case options
76 traffic_profile.update_dmf(self.scenario_helper.all_options)
77 # Create DMF in test user library
78 self.resource_helper.create_dmf(traffic_profile.dmf_config)
79 # Create/update test session in test user library
80 self.resource_helper.create_test_session(self.session_profile)
82 self.resource_helper.create_running_tests(self.session_profile['name'])
84 def collect_kpi(self):
85 return self.resource_helper.collect_kpi()
87 def wait_for_instantiate(self):
91 def _update_session_suts(suts, testcase):
92 """ Create SUT entry. Update related EPC block in session profile. """
94 # Update session profile EPC element with SUT info from pod file
95 tc_role = testcase['parameters'].get(sut['role'])
98 if tc_role['class'] == 'Sut':
99 _param['name'] = sut['name']
100 elif tc_role['class'] == 'TestNode':
101 _param.update({x: sut[x] for x in {'ip', 'phy', 'nextHop'}
102 if x in sut and sut[x]})
103 testcase['parameters'][sut['role']].update(_param)
105 LOG.info('Unexpected SUT role in pod file: "%s".', sut['role'])
108 def _update_session_test_servers(self, test_server, _tsgroup_index):
109 """ Update tsId, reservations, pre-resolved ARP in session profile """
110 # Update test server name
111 test_groups = self.session_profile['tsGroups']
112 test_groups[_tsgroup_index]['tsId'] = test_server['name']
114 # Update preResolvedArpAddress
115 arp_key = 'preResolvedArpAddress'
116 _preresolved_arp = test_server.get(arp_key) # list of dicts
118 test_groups[_tsgroup_index][arp_key] = _preresolved_arp
120 # Update reservations
121 if 'phySubnets' in test_server:
122 reservation = {'tsId': test_server['name'],
123 'tsIndex': _tsgroup_index,
124 'tsName': test_server['name'],
125 'phySubnets': test_server['phySubnets']}
126 if 'reservations' in self.session_profile:
127 self.session_profile['reservations'].append(reservation)
129 self.session_profile['reservePorts'] = 'true'
130 self.session_profile['reservations'] = [reservation]
133 def _update_session_tc_params(tc_options, testcase):
134 for _param_key in tc_options:
135 if _param_key == 'AssociatedPhys':
136 testcase[_param_key] = tc_options[_param_key]
138 testcase['parameters'][_param_key] = tc_options[_param_key]
141 def _load_session_profile(self):
143 with common_utils.open_relative_file(
144 self.scenario_helper.scenario_cfg['session_profile'],
145 self.scenario_helper.task_path) as stream:
146 self.session_profile = yaml_loader.yaml_load(stream)
148 # Raise exception if number of entries differs in following files,
149 _config_files = ['pod file', 'session_profile file', 'test_case file']
150 # Count testcases number in all tsGroups of session profile
151 session_tests_num = [xx for x in self.session_profile['tsGroups']
152 for xx in x['testCases']]
153 # Create a set containing number of list elements in each structure
154 _config_files_blocks_num = [
156 (self.vnfd_helper['config'], # test_servers and suts info
158 self.scenario_helper.all_options['test_cases'])] # test case file
160 if len(set(_config_files_blocks_num)) != 1:
161 raise RuntimeError('Unequal number of elements. {}'.format(
162 dict(six.moves.zip_longest(_config_files,
163 _config_files_blocks_num))))
169 # Iterate over data structures to overwrite session profile defaults
170 # _config: single list element holding test servers and SUTs info
171 # _tc_options: single test case parameters
172 for _config, tc_options in zip(
173 self.vnfd_helper['config'], # test servers and SUTS
174 self.scenario_helper.all_options['test_cases']): # testcase
176 _ts_config = _config['test_server']
178 # Calculate test group/test case indexes based on test server name
179 if _ts_config['name'] in ts_names:
186 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
189 if _testcase['type'] != _ts_config['role']:
191 'Test type mismatch in TC#{} of test server {}'.format(
192 _testcase_idx, _ts_config['name']))
194 # Fill session profile with test servers parameters
195 if _ts_config['name'] not in ts_names:
196 self._update_session_test_servers(_ts_config, _tsgroup_idx)
197 ts_names.add(_ts_config['name'])
199 # Fill session profile with suts parameters
200 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
201 _testcase_idx].update(
202 self._update_session_suts(_config['suts'], _testcase))
204 # Update test case parameters
205 self.session_profile['tsGroups'][_tsgroup_idx]['testCases'][
206 _testcase_idx].update(
207 self._update_session_tc_params(tc_options, _testcase))
210 class LandslideResourceHelper(sample_vnf.ClientResourceHelper):
211 """Landslide TG helper class"""
213 REST_STATUS_CODES = {'OK': 200, 'CREATED': 201, 'NO CHANGE': 409}
214 REST_API_CODES = {'NOT MODIFIED': 500810}
216 def __init__(self, setup_helper):
217 super(LandslideResourceHelper, self).__init__(setup_helper)
219 self.vnfd_helper = setup_helper.vnfd_helper
220 self.scenario_helper = setup_helper.scenario_helper
222 # TAS Manager config initialization
226 self.license_data = {}
228 # TCL session initialization
229 self._tcl = LandslideTclClient(LsTclHandler(), self)
231 self.session = requests.Session()
232 self.running_tests_uri = 'runningTests'
233 self.test_session_uri = 'testSessions'
234 self.test_serv_uri = 'testServers'
235 self.suts_uri = 'suts'
236 self.users_uri = 'users'
237 self.user_lib_uri = None
240 def abort_running_tests(self, timeout=60, delay=5):
241 """ Abort running test sessions, if any """
242 _start_time = time.time()
243 while time.time() < _start_time + timeout:
244 run_tests_states = {x['id']: x['testStateOrStep']
245 for x in self.get_running_tests()}
246 if not set(run_tests_states.values()).difference(
247 {'COMPLETE', 'COMPLETE_ERROR'}):
250 [self.stop_running_tests(running_test_id=_id, force=True)
251 for _id, _state in run_tests_states.items()
252 if 'COMPLETE' not in _state]
256 'Some test runs not stopped during {} seconds'.format(timeout))
258 def _build_url(self, resource, action=None):
261 :param resource: REST API resource name
263 :param action: actions name and value
264 :type action: dict('name': <str>, 'value': <str>)
265 :returns str: REST API resource name with optional action info
267 # Action is optional and accepted only in presence of resource param
268 if action and not resource:
269 raise ValueError("Resource name not provided")
270 # Concatenate actions
271 _action = ''.join(['?{}={}'.format(k, v) for k, v in
272 action.items()]) if action else ''
274 return ''.join([self._url, resource, _action])
276 def get_response_params(self, method, resource, params=None):
277 """ Retrieve params from JSON response of specific resource URL
279 :param method: one of supported REST API methods
281 :param resource: URI, requested resource name
283 :param params: attributes to be found in JSON response
284 :type params: list(str)
287 params = params if params else []
288 response = self.exec_rest_request(method, resource)
289 # Get substring between last slash sign and question mark (if any)
290 url_last_part = resource.rsplit('/', 1)[-1].rsplit('?', 1)[0]
291 _response_json = response.json()
292 # Expect dict(), if URL last part and top dict key don't match
293 # Else, if they match, expect list()
294 k, v = list(_response_json.items())[0]
295 if k != url_last_part:
296 v = [v] # v: list(dict(str: str))
297 # Extract params, or whole list of dicts (without top level key)
299 _res.append({param: x[param] for param in params} if params else x)
302 def _create_user(self, auth, level=1):
305 :param auth: data to create user account on REST server
307 :param level: Landslide user permissions level
309 :returns int: user id
311 # Set expiration date in two years since account creation date
312 _exp_date = time.strftime(
313 '{}/%m/%d %H:%M %Z'.format(time.gmtime().tm_year + 2))
314 _username = auth['user']
315 _fields = {"contactInformation": "", "expiresOn": _exp_date,
316 "fullName": "Test User",
317 "isActive": "true", "level": level,
318 "password": auth['password'],
319 "username": _username}
320 _response = self.exec_rest_request('post', self.users_uri,
321 json_data=_fields, raise_exc=False)
322 _resp_json = _response.json()
323 if _response.status_code == self.REST_STATUS_CODES['CREATED']:
325 _id = _resp_json['id']
326 LOG.info("New user created: username='%s', id='%s'", _username,
328 elif _resp_json.get('apiCode') == self.REST_API_CODES['NOT MODIFIED']:
329 # User already exists
330 LOG.info("Account '%s' already exists.", _username)
332 _id = self._modify_user(_username, {"isActive": "true"})['id']
334 raise exceptions.RestApiError(
335 'Error during new user "{}" creation'.format(_username))
338 def _modify_user(self, username, fields):
339 """ Modify information about existing user
341 :param username: user name of account to be modified
343 :param fields: data to modify user account on REST server
345 :returns dict: user info
347 _response = self.exec_rest_request('post', self.users_uri,
348 action={'username': username},
349 json_data=fields, raise_exc=False)
350 if _response.status_code == self.REST_STATUS_CODES['OK']:
351 _response = _response.json()
353 raise exceptions.RestApiError(
354 'Error during user "{}" data update: {}'.format(
356 _response.status_code))
357 LOG.info("User account '%s' modified: '%s'", username, _response)
360 def _delete_user(self, username):
361 """ Delete user account
363 :param username: username field
365 :returns bool: True if succeeded
367 self.exec_rest_request('delete', self.users_uri,
368 action={'username': username})
370 def _get_users(self, username=None):
371 """ Get user records from REST server
373 :param username: username field
374 :type username: None|str
375 :returns list(dict): empty list, or user record, or list of all users
377 _response = self.get_response_params('get', self.users_uri)
378 _res = [u for u in _response if
379 u['username'] == username] if username else _response
382 def exec_rest_request(self, method, resource, action=None, json_data=None,
383 logs=True, raise_exc=True):
384 """ Execute REST API request, return response object
386 :param method: one of supported requests ('post', 'get', 'delete')
388 :param resource: URL of resource
390 :param action: data used to provide URI located after question mark
392 :param json_data: mandatory only for 'post' method
393 :type json_data: dict
394 :param logs: debug logs display flag
395 :type raise_exc: bool
396 :param raise_exc: if True, raise exception on REST API call error
397 :returns requests.Response(): REST API call response object
399 json_data = json_data if json_data else {}
400 action = action if action else {}
401 _method = method.upper()
402 method = method.lower()
403 if method not in ('post', 'get', 'delete'):
404 raise ValueError("Method '{}' not supported".format(_method))
406 if method == 'post' and not action:
407 if not (json_data and isinstance(json_data, collections.Mapping)):
409 'JSON data missing in {} request'.format(_method))
411 r = getattr(self.session, method)(self._build_url(resource, action),
413 if raise_exc and not r.ok:
414 msg = 'Failed to "{}" resource "{}". Reason: "{}"'.format(
415 method, self._build_url(resource, action), r.reason)
416 raise exceptions.RestApiError(msg)
419 LOG.debug("RC: %s | Request: %s | URL: %s", r.status_code, method,
421 LOG.debug("Response: %s", r.json())
425 """Connect to RESTful server using test user account"""
426 tas_info = self.vnfd_helper['mgmt-interface']
427 # Supported REST Server ports: HTTP - 8080, HTTPS - 8181
428 _port = '8080' if tas_info['proto'] == 'http' else '8181'
429 tas_info.update({'port': _port})
430 self._url = '{proto}://{ip}:{port}/api/'.format(**tas_info)
431 self.session.headers.update({'Accept': 'application/json',
432 'Content-type': 'application/json'})
433 # Login with super user to create test user
434 self.session.auth = (
435 tas_info['super-user'], tas_info['super-user-password'])
436 LOG.info("Connect using superuser: server='%s'", self._url)
437 auth = {x: tas_info[x] for x in ('user', 'password')}
438 self._user_id = self._create_user(auth)
439 # Login with test user
440 self.session.auth = auth['user'], auth['password']
442 self.exec_rest_request('get', '')
444 self.user_lib_uri = 'libraries/{{}}/{}'.format(self.test_session_uri)
445 LOG.info("Login with test user: server='%s'", self._url)
446 # Read existing license
447 self.license_data['lic_id'] = tas_info['license']
450 self._tcl.connect(tas_info['ip'], *self.session.auth)
454 def disconnect(self):
456 self._tcl.disconnect()
459 self._terminated.value = 1
461 def create_dmf(self, dmf):
462 if isinstance(dmf, list):
464 self._tcl.create_dmf(_dmf)
466 self._tcl.create_dmf(dmf)
468 def delete_dmf(self, dmf):
469 if isinstance(dmf, list):
471 self._tcl.delete_dmf(_dmf)
473 self._tcl.delete_dmf(dmf)
475 def create_suts(self, suts):
476 # Keep only supported keys in suts object
478 sut_entry = {k: v for k, v in _sut.items()
479 if k not in {'phy', 'nextHop', 'role'}}
480 _response = self.exec_rest_request(
481 'post', self.suts_uri, json_data=sut_entry,
482 logs=False, raise_exc=False)
483 if _response.status_code != self.REST_STATUS_CODES['CREATED']:
484 LOG.info(_response.reason) # Failed to create
485 _name = sut_entry.pop('name')
486 # Modify existing SUT
487 self.configure_sut(sut_name=_name, json_data=sut_entry)
489 LOG.info("SUT created: %s", sut_entry)
491 def get_suts(self, suts_id=None):
493 _suts = self.exec_rest_request(
494 'get', '{}/{}'.format(self.suts_uri, suts_id)).json()
496 _suts = self.get_response_params('get', self.suts_uri)
500 def configure_sut(self, sut_name, json_data):
501 """ Modify information of specific SUTs
503 :param sut_name: name of existing SUT
505 :param json_data: SUT settings
506 :type json_data: dict()
508 LOG.info("Modifying SUT information...")
509 _response = self.exec_rest_request('post',
511 action={'name': sut_name},
514 if _response.status_code not in {self.REST_STATUS_CODES[x] for x in
515 {'OK', 'NO CHANGE'}}:
516 raise exceptions.RestApiError(_response.reason)
518 LOG.info("Modified SUT: %s", sut_name)
520 def delete_suts(self, suts_ids=None):
522 _curr_suts = self.get_response_params('get', self.suts_uri)
523 suts_ids = [x['id'] for x in _curr_suts]
524 LOG.info("Deleting SUTs with following IDs: %s", suts_ids)
526 self.exec_rest_request('delete',
527 '{}/{}'.format(self.suts_uri, _id))
528 LOG.info("\tDone for SUT id: %s", _id)
530 def _check_test_servers_state(self, test_servers_ids=None, delay=10,
532 LOG.info("Waiting for related test servers state change to READY...")
533 # Wait on state change
534 _start_time = time.time()
535 while time.time() - _start_time < timeout:
536 ts_ids_not_ready = {x['id'] for x in
537 self.get_test_servers(test_servers_ids)
538 if x['state'] != 'READY'}
539 if ts_ids_not_ready == set():
544 'Test servers not in READY state after {} seconds.'.format(
547 def create_test_servers(self, test_servers):
548 """ Create test servers
550 :param test_servers: input data for test servers creation
551 mandatory fields: managementIp
552 optional fields: name
553 :type test_servers: list(dict)
556 for _ts in test_servers:
557 _msg = 'Created test server "%(name)s"'
558 _ts_ids.append(self._tcl.create_test_server(_ts))
559 if _ts.get('thread_model'):
560 _msg += ' in mode: "%(thread_model)s"'
563 self._check_test_servers_state(_ts_ids)
565 def get_test_servers(self, test_server_ids=None):
566 if not test_server_ids: # Get all test servers info
567 _test_servers = self.exec_rest_request(
568 'get', self.test_serv_uri).json()[self.test_serv_uri]
569 LOG.info("Current test servers configuration: %s", _test_servers)
573 for _id in test_server_ids:
574 _test_servers.append(self.exec_rest_request(
575 'get', '{}/{}'.format(self.test_serv_uri, _id)).json())
576 LOG.info("Current test servers configuration: %s", _test_servers)
579 def configure_test_servers(self, action, json_data=None,
580 test_server_ids=None):
581 if not test_server_ids:
582 test_server_ids = [x['id'] for x in self.get_test_servers()]
583 elif isinstance(test_server_ids, int):
584 test_server_ids = [test_server_ids]
585 for _id in test_server_ids:
586 self.exec_rest_request('post',
587 '{}/{}'.format(self.test_serv_uri, _id),
588 action=action, json_data=json_data)
589 LOG.info("Test server (id: %s) configuration done: %s", _id,
591 return test_server_ids
593 def delete_test_servers(self, test_servers_ids=None):
594 # Delete test servers
595 for _ts in self.get_test_servers(test_servers_ids):
596 self.exec_rest_request('delete', '{}/{}'.format(self.test_serv_uri,
598 LOG.info("Deleted test server: %s", _ts['name'])
600 def create_test_session(self, test_session):
601 # Use tcl client to create session
602 test_session['library'] = self._user_id
604 # If no traffic duration set in test case, use predefined default value
606 test_session['duration'] = self.scenario_helper.all_options.get(
608 test_session['duration'])
610 LOG.debug("Creating session='%s'", test_session['name'])
611 self._tcl.create_test_session(test_session)
613 def get_test_session(self, test_session_name=None):
614 if test_session_name:
615 uri = 'libraries/{}/{}/{}'.format(self._user_id,
616 self.test_session_uri,
619 uri = self.user_lib_uri.format(self._user_id)
620 _test_sessions = self.exec_rest_request('get', uri).json()
621 return _test_sessions
623 def configure_test_session(self, template_name, test_session):
624 # Override specified test session parameters
625 LOG.info('Update test session parameters: %s', test_session['name'])
626 test_session.update({'library': self._user_id})
627 return self.exec_rest_request(
629 action={'action': 'overrideAndSaveAs'},
630 json_data=test_session,
631 resource='{}/{}'.format(self.user_lib_uri.format(self._user_id),
634 def delete_test_session(self, test_session):
635 return self.exec_rest_request('delete', '{}/{}'.format(
636 self.user_lib_uri.format(self._user_id), test_session))
638 def create_running_tests(self, test_session_name):
639 r = self.exec_rest_request('post',
640 self.running_tests_uri,
641 json_data={'library': self._user_id,
642 'name': test_session_name})
643 if r.status_code != self.REST_STATUS_CODES['CREATED']:
644 raise exceptions.RestApiError('Failed to start test session.')
645 self.run_id = r.json()['id']
647 def get_running_tests(self, running_test_id=None):
648 """Get JSON structure of specified running test entity
650 :param running_test_id: ID of created running test entity
651 :type running_test_id: int
652 :returns list: running tests entity
654 if not running_test_id:
656 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
657 _res = self.exec_rest_request('get', _res_name, logs=False).json()
658 # If no run_id specified, skip top level key in response dict.
659 # Else return JSON as list
660 return _res.get('runningTests', [_res])
662 def delete_running_tests(self, running_test_id=None):
663 if not running_test_id:
665 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
666 self.get_response_params('delete', _res_name)
667 LOG.info("Deleted running test with id: %s", running_test_id)
669 def _running_tests_action(self, running_test_id, action, json_data=None):
673 # 'stop', 'abort', 'continue', 'update', 'sendTcCommand', 'sendOdc'
674 _res_name = '{}/{}'.format(self.running_tests_uri, running_test_id)
675 self.exec_rest_request('post', _res_name, {'action': action},
677 LOG.debug("Executed action: '%s' on running test id: %s", action,
680 def stop_running_tests(self, running_test_id, json_data=None, force=False):
681 _action = 'abort' if force else 'stop'
682 self._running_tests_action(running_test_id, _action,
684 LOG.info('Performed action: "%s" to test run with id: %s', _action,
687 def check_running_test_state(self, run_id):
688 r = self.exec_rest_request('get',
689 '{}/{}'.format(self.running_tests_uri,
691 return r.json().get("testStateOrStep")
693 def get_running_tests_results(self, run_id):
694 _res = self.exec_rest_request(
696 '{}/{}/{}'.format(self.running_tests_uri,
698 'measurements')).json()
701 def _write_results(self, results):
702 # Avoid None value at test session start
703 _elapsed_time = results['elapsedTime'] if results['elapsedTime'] else 0
705 _res_tabs = results.get('tabs')
706 # Avoid parsing 'tab' dict key initially (missing or empty)
710 # Flatten nested dict holding Landslide KPIs of current test run
712 for _tab, _kpis in six.iteritems(_res_tabs):
713 for _kpi, _value in six.iteritems(_kpis):
714 # Combine table name and KPI name using delimiter "::"
715 _key = '::'.join([_tab, _kpi])
717 # Cast value from str to float
718 # Remove comma and/or measure units, e.g. "us"
719 flat_kpis_dict[_key] = float(
720 _value.split(' ')[0].replace(',', ''))
721 except ValueError: # E.g. if KPI represents datetime
723 LOG.info("Polling test results of test run id: %s. Elapsed time: %s "
724 "seconds", self.run_id, _elapsed_time)
725 return flat_kpis_dict
727 def collect_kpi(self):
728 if 'COMPLETE' in self.check_running_test_state(self.run_id):
729 self._result.update({'done': True})
731 _res = self.get_running_tests_results(self.run_id)
732 _kpis = self._write_results(_res)
734 _kpis.update({'run_id': int(self.run_id)})
735 _kpis.update({'iteration': _res['iteration']})
736 self._result.update(_kpis)
740 class LandslideTclClient(object):
741 """Landslide TG TCL client class"""
743 DEFAULT_TEST_NODE = {
744 'ethStatsEnabled': True,
745 'forcedEthInterface': '',
751 'numLinksOrNodes': 1,
754 'uniqueVlanAddr': False,
757 'vlanUserPriority': 0,
762 'ls::create -TestNode-{} -under $p_ -Type "eth"' \
763 ' -Phy "{phy}" -Ip "{ip}" -NumLinksOrNodes {numLinksOrNodes}' \
764 ' -NextHop "{nextHop}" -Mac "{mac}" -MTU {mtu}' \
765 ' -ForcedEthInterface "{forcedEthInterface}"' \
766 ' -EthStatsEnabled {ethStatsEnabled}' \
767 ' -VlanId {vlanId} -VlanUserPriority {vlanUserPriority}' \
768 ' -NumVlan {numVlan} -UniqueVlanAddr {uniqueVlanAddr}' \
771 def __init__(self, tcl_handler, ts_context):
772 self.tcl_server_ip = None
774 self._library_id = None
775 self._basic_library_id = None
776 self._tcl = tcl_handler
777 self._ts_context = ts_context
780 # Test types names expected in session profile, test case and pod files
781 self._tc_types = {"SGW_Nodal", "SGW_Node", "MME_Nodal", "PGW_Node",
784 self._class_param_config_handler = {
785 "Array": self._configure_array_param,
786 "TestNode": self._configure_test_node_param,
787 "Sut": self._configure_sut_param,
788 "Dmf": self._configure_dmf_param
791 def connect(self, tcl_server_ip, username, password):
792 """ Connect to TCL server with username and password
794 :param tcl_server_ip: TCL server IP address
795 :type tcl_server_ip: str
796 :param username: existing username on TCL server
798 :param password: password related to username on TCL server
801 LOG.info("connect: server='%s' user='%s'", tcl_server_ip, username)
802 res = self._tcl.execute(
803 "ls::login {} {} {}".format(tcl_server_ip, username, password))
804 if 'java0x' not in res: # handle assignment reflects login success
805 raise exceptions.LandslideTclException(
806 "connect: login failed ='{}'.".format(res))
807 self._library_id = self._tcl.execute(
808 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
810 self._basic_library_id = self._get_library_id('Basic')
811 self.tcl_server_ip = tcl_server_ip
812 self._user = username
813 LOG.debug("connect: user='%s' me='%s' basic='%s'", self._user,
815 self._basic_library_id)
817 def disconnect(self):
818 """ Disconnect from TCL server. Drop TCL connection configuration """
819 LOG.info("disconnect: server='%s' user='%s'",
820 self.tcl_server_ip, self._user)
821 self._tcl.execute("ls::logout")
822 self.tcl_server_ip = None
824 self._library_id = None
825 self._basic_library_id = None
827 def _add_test_server(self, name, ip):
829 # Check if test server exists with name equal to _ts_name
830 ts_id = int(self.resolve_test_server_name(name))
832 # Such test server does not exist. Attempt to create it
833 ts_id = self._tcl.execute(
834 'ls::perform AddTs -Name "{}" -Ip "{}"'.format(name, ip))
838 # Failed to create test server, e.g. limit reached
840 'Failed to create test server: "{}". {}'.format(name,
844 def _update_license(self, name):
845 """ Setup/update test server license
847 :param name: test server name
850 # Retrieve current TsInfo configuration, result stored in handle "ts"
852 'set ts [ls::retrieve TsInfo -Name "{}"]'.format(name))
854 # Set license ID, if it differs from current one, update test server
855 _curr_lic_id = self._tcl.execute('ls::get $ts -RequestedLicense')
856 if _curr_lic_id != self._ts_context.license_data['lic_id']:
857 self._tcl.execute('ls::config $ts -RequestedLicense {}'.format(
858 self._ts_context.license_data['lic_id']))
859 self._tcl.execute('ls::perform ModifyTs $ts')
861 def _set_thread_model(self, name, thread_model):
862 # Retrieve test server configuration, store it in handle "tsc"
863 _cfguser_password = self._ts_context.vnfd_helper['mgmt-interface'][
866 'set tsc [ls::perform RetrieveTsConfiguration '
867 '-name "{}" {}]'.format(name, _cfguser_password))
868 # Configure ThreadModel, if it differs from current one
869 thread_model_map = {'Legacy': 'V0',
871 'Fireball': 'V1_FB3'}
872 _model = thread_model_map[thread_model]
873 _curr_model = self._tcl.execute('ls::get $tsc -ThreadModel')
874 if _curr_model != _model:
876 'ls::config $tsc -ThreadModel "{}"'.format(_model))
878 'ls::perform ApplyTsConfiguration $tsc {}'.format(
881 def create_test_server(self, test_server):
882 _ts_thread_model = test_server.get('thread_model')
883 _ts_name = test_server['name']
885 ts_id = self._add_test_server(_ts_name, test_server['ip'])
887 self._update_license(_ts_name)
889 # Skip below code modifying thread_model if it is not defined
891 self._set_thread_model(_ts_name, _ts_thread_model)
895 def create_test_session(self, test_session):
896 """ Create, configure and save Landslide test session object.
898 :param test_session: Landslide TestSession object
899 :type test_session: dict
901 LOG.info("create_test_session: name='%s'", test_session['name'])
902 self._tcl.execute('set test_ [ls::create TestSession]')
903 self._tcl.execute('ls::config $test_ -Library {} -Name "{}"'.format(
904 self._library_id, test_session['name']))
905 self._tcl.execute('ls::config $test_ -Description "{}"'.format(
906 test_session['description']))
907 if 'keywords' in test_session:
908 self._tcl.execute('ls::config $test_ -Keywords "{}"'.format(
909 test_session['keywords']))
910 if 'duration' in test_session:
911 self._tcl.execute('ls::config $test_ -Duration "{}"'.format(
912 test_session['duration']))
913 if 'iterations' in test_session:
914 self._tcl.execute('ls::config $test_ -Iterations "{}"'.format(
915 test_session['iterations']))
916 if 'reservePorts' in test_session:
917 if test_session['reservePorts'] == 'true':
918 self._tcl.execute('ls::config $test_ -Reserve Ports')
920 if 'reservations' in test_session:
921 for _reservation in test_session['reservations']:
922 self._configure_reservation(_reservation)
924 if 'reportOptions' in test_session:
925 self._configure_report_options(test_session['reportOptions'])
927 for _index, _group in enumerate(test_session['tsGroups']):
928 self._configure_ts_group(_group, _index)
930 self._save_test_session()
932 def create_dmf(self, dmf):
933 """ Create, configure and save Landslide Data Message Flow object.
935 :param dmf: Landslide Data Message Flow object
938 self._tcl.execute('set dmf_ [ls::create Dmf]')
939 _lib_id = self._get_library_id(dmf['dmf']['library'])
940 self._tcl.execute('ls::config $dmf_ -Library {} -Name "{}"'.format(
943 for _param_key in dmf:
944 if _param_key == 'dmf':
946 _param_value = dmf[_param_key]
947 if isinstance(_param_value, dict):
948 # Configure complex parameter
949 _tcl_cmd = 'ls::config $dmf_'
950 for _sub_param_key in _param_value:
951 _sub_param_value = _param_value[_sub_param_key]
952 if isinstance(_sub_param_value, str):
953 _tcl_cmd += ' -{} "{}"'.format(_sub_param_key,
956 _tcl_cmd += ' -{} {}'.format(_sub_param_key,
959 self._tcl.execute(_tcl_cmd)
961 # Configure simple parameter
962 if isinstance(_param_value, str):
964 'ls::config $dmf_ -{} "{}"'.format(_param_key,
968 'ls::config $dmf_ -{} {}'.format(_param_key,
972 def configure_dmf(self, dmf):
973 # Use create to reconfigure and overwrite existing dmf
976 def delete_dmf(self, dmf):
977 raise NotImplementedError
980 # Call 'Validate' to set default values for missing parameters
981 res = self._tcl.execute('ls::perform Validate -Dmf $dmf_')
983 res = self._tcl.execute('ls::get $dmf_ -ErrorsAndWarnings')
984 LOG.error("_save_dmf: %s", res)
985 raise exceptions.LandslideTclException("_save_dmf: {}".format(res))
987 res = self._tcl.execute('ls::save $dmf_ -overwrite')
988 LOG.debug("_save_dmf: result (%s)", res)
990 def _configure_report_options(self, options):
991 for _option_key in options:
992 _option_value = options[_option_key]
993 if _option_key == 'format':
995 if _option_value == 'CSV':
998 'ls::config $test_.ReportOptions -Format {} '
999 '-Ts -3 -Tc -3'.format(_format))
1002 'ls::config $test_.ReportOptions -{} {}'.format(
1006 def _configure_ts_group(self, ts_group, ts_group_index):
1008 _ts_id = int(self.resolve_test_server_name(ts_group['tsId']))
1010 raise RuntimeError('Test server name "{}" does not exist.'.format(
1012 if _ts_id not in self.ts_ids:
1014 'set tss_ [ls::create TsGroup -under $test_ -tsId {} ]'.format(
1016 self.ts_ids.add(_ts_id)
1017 for _case in ts_group.get('testCases', []):
1018 self._configure_tc_type(_case, ts_group_index)
1020 self._configure_preresolved_arp(ts_group.get('preResolvedArpAddress'))
1022 def _configure_tc_type(self, tc, ts_group_index):
1023 if tc['type'] not in self._tc_types:
1024 raise RuntimeError('Test type {} not supported.'.format(
1026 tc['type'] = tc['type'].replace('_', ' ')
1027 res = self._tcl.execute(
1028 'set tc_ [ls::retrieve testcase -libraryId {0} "{1}"]'.format(
1029 self._basic_library_id, tc['type']))
1030 if 'Invalid' in res:
1031 raise RuntimeError('Test type {} not found in "Basic" '
1032 'library.'.format(tc['type']))
1034 'ls::config $test_.TsGroup({}) -children-Tc $tc_'.format(
1036 self._tcl.execute('ls::config $tc_ -Library {0} -Name "{1}"'.format(
1037 self._basic_library_id, tc['name']))
1039 'ls::config $tc_ -Description "{}"'.format(tc['type']))
1041 'ls::config $tc_ -Keywords "GTP LTE {}"'.format(tc['type']))
1044 'ls::config $tc_ -Linked {}'.format(tc['linked']))
1045 if 'AssociatedPhys' in tc:
1046 self._tcl.execute('ls::config $tc_ -AssociatedPhys "{}"'.format(
1047 tc['AssociatedPhys']))
1048 if 'parameters' in tc:
1049 self._configure_parameters(tc['parameters'])
1051 def _configure_parameters(self, params):
1052 self._tcl.execute('set p_ [ls::get $tc_ -children-Parameters(0)]')
1053 for _param_key in sorted(params):
1054 _param_value = params[_param_key]
1055 if isinstance(_param_value, dict):
1056 # Configure complex parameter
1057 if _param_value['class'] in self._class_param_config_handler:
1058 self._class_param_config_handler[_param_value['class']](
1062 # Configure simple parameter
1064 'ls::create {} -under $p_ -Value "{}"'.format(
1068 def _configure_array_param(self, name, params):
1069 self._tcl.execute('ls::create -Array-{} -under $p_ ;'.format(name))
1070 for param in params['array']:
1072 'ls::create ArrayItem -under $p_.{} -Value "{}"'.format(name,
1075 def _configure_test_node_param(self, name, params):
1076 _params = self.DEFAULT_TEST_NODE
1077 _params.update(params)
1079 # TCL command expects lower case 'true' or 'false'
1080 _params['ethStatsEnabled'] = str(_params['ethStatsEnabled']).lower()
1081 _params['uniqueVlanAddr'] = str(_params['uniqueVlanAddr']).lower()
1083 cmd = self.TEST_NODE_CMD.format(name, **_params)
1084 self._tcl.execute(cmd)
1086 def _configure_sut_param(self, name, params):
1088 'ls::create -Sut-{} -under $p_ -Name "{}";'.format(name,
1091 def _configure_dmf_param(self, name, params):
1092 self._tcl.execute('ls::create -Dmf-{} -under $p_ ;'.format(name))
1094 for _flow_index, _flow in enumerate(params['mainflows']):
1095 _lib_id = self._get_library_id(_flow['library'])
1097 'ls::perform AddDmfMainflow $p_.Dmf {} "{}"'.format(
1101 if not params.get('instanceGroups'):
1104 _instance_group = params['instanceGroups'][_flow_index]
1106 # Traffic Mixer parameters handling
1107 for _key in ['mixType', 'rate']:
1108 if _key in _instance_group:
1110 'ls::config $p_.Dmf.InstanceGroup({}) -{} {}'.format(
1111 _flow_index, _key, _instance_group[_key]))
1113 # Assignments parameters handling
1114 for _row_id, _row in enumerate(_instance_group.get('rows', [])):
1116 'ls::config $p_.Dmf.InstanceGroup({}).Row({}) -Node {} '
1117 '-OverridePort {} -ClientPort {} -Context {} -Role {} '
1118 '-PreferredTransport {} -RatingGroup {} '
1119 '-ServiceID {}'.format(
1120 _flow_index, _row_id, _row['node'],
1121 _row['overridePort'], _row['clientPort'],
1122 _row['context'], _row['role'], _row['transport'],
1123 _row['ratingGroup'], _row['serviceId']))
1125 def _configure_reservation(self, reservation):
1126 _ts_id = self.resolve_test_server_name(reservation['tsId'])
1128 'set reservation_ [ls::create Reservation -under $test_]')
1130 'ls::config $reservation_ -TsIndex {} -TsId {} '
1131 '-TsName "{}"'.format(reservation['tsIndex'],
1133 reservation['tsName']))
1134 for _subnet in reservation['phySubnets']:
1136 'set physubnet_ [ls::create PhySubnet -under $reservation_]')
1138 'ls::config $physubnet_ -Name "{}" -Base "{}" -Mask "{}" '
1139 '-NumIps {}'.format(_subnet['name'], _subnet['base'],
1140 _subnet['mask'], _subnet['numIps']))
1142 def _configure_preresolved_arp(self, pre_resolved_arp):
1143 if not pre_resolved_arp: # Pre-resolved ARP configuration not found
1145 for _entry in pre_resolved_arp:
1146 # TsGroup handle name should correspond in _configure_ts_group()
1148 'ls::create PreResolvedArpAddress -under $tss_ '
1149 '-StartingAddress "{StartingAddress}" '
1150 '-NumNodes {NumNodes}'.format(**_entry))
1152 def delete_test_session(self, test_session):
1153 raise NotImplementedError
1155 def _save_test_session(self):
1156 # Call 'Validate' to set default values for missing parameters
1157 res = self._tcl.execute('ls::perform Validate -TestSession $test_')
1158 if res == 'Invalid':
1159 res = self._tcl.execute('ls::get $test_ -ErrorsAndWarnings')
1160 raise exceptions.LandslideTclException(
1161 "Test session validation failed. Server response: {}".format(
1164 self._tcl.execute('ls::save $test_ -overwrite')
1165 LOG.debug("Test session saved successfully.")
1167 def _get_library_id(self, library):
1168 _library_id = self._tcl.execute(
1169 "ls::get [ls::query LibraryInfo -systemLibraryName {}] -Id".format(
1177 _library_id = self._tcl.execute(
1178 "ls::get [ls::query LibraryInfo -userLibraryName {}] -Id".format(
1183 LOG.error("_get_library_id: library='%s' not found.", library)
1184 raise exceptions.LandslideTclException(
1185 "_get_library_id: library='{}' not found.".format(
1190 def resolve_test_server_name(self, ts_name):
1191 return self._tcl.execute("ls::query TsId {}".format(ts_name))
1194 class LsTclHandler(object):
1195 """Landslide TCL Handler class"""
1198 JRE_PATH = net_serv_utils.get_nsb_option('jre_path_i386')
1202 self._ls = LsApi(jre_path=self.JRE_PATH)
1204 "ls::config ApiOptions -NoReturnSuccessResponseString '{}'".format(
1207 def execute(self, command):
1208 res = self._ls.tcl(command)
1209 self.tcl_cmds[command] = res