# -*- coding: utf-8 -*-
-
+#
+# Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved.
+#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+#
"""Function for testing collectd plug-ins with different oup plug-ins"""
import time
+import math
-def test_gnocchi_node_sends_data(
- node_id, interval, logger, client, criteria_list=[],
- resource_id_substrings=['']):
- logger.info("Gnocchi test cases will be coming soon!!")
- return False
+def test_snmp_sends_data(
+ compute, interval, logger, client, mib_file=None,
+ mib_strings=None, in_command=None, conf=None):
+ """Check that SNMP deta are updated"""
+ logger.debug('Interval: {}'.format(interval))
+ if mib_file is not None:
+ logger.info(
+ 'Getting SNMP metrics of MIB file {} and '.format(mib_file)
+ + 'following MIB strings: {}...'.format(', '.join(mib_strings)))
+ snmp_metrics = client.get_snmp_metrics(compute, mib_file, mib_strings)
+ if mib_file is None:
+ return len(snmp_metrics) > 1
+ if in_command is not None and conf is not None:
+ conf.execute_command(in_command, compute.get_ip())
+
+ attempt = 1
+ is_passed = False
+ while (attempt <= 10) and not is_passed:
+ is_passed = True
+ # wait Interval time + 2 sec for db update
+ sleep_time = interval + 2
+ if attempt > 1:
+ logger.info('Starting attempt {}'.format(attempt))
+ logger.info(
+ 'Sleeping for {} seconds to get updated entries'.format(sleep_time)
+ + ' (interval is {} sec)...'.format(interval))
+ time.sleep(sleep_time)
+
+ logger.info(
+ 'Getting SNMP metrics of MIB file {} and '.format(mib_file)
+ + 'following MIB strings: {}...'.format(', '.join(mib_strings)))
+ snmp_metrics2 = client.get_snmp_metrics(compute, mib_file, mib_strings)
+ unchanged_snmp_metrics = [
+ snmp_metric for snmp_metric in snmp_metrics
+ if snmp_metrics[snmp_metric] == snmp_metrics2[snmp_metric]]
+ if len(unchanged_snmp_metrics) > 0:
+ logger.error("Following SNMP metrics didn't change: {}".format(
+ ', '.join(unchanged_snmp_metrics)))
+ is_passed = False
+ attempt += 1
+ if not is_passed:
+ logger.warning('After sleep new entries were not found.')
+ if not is_passed:
+ logger.error('This was the last attempt.')
+ return False
+ logger.info('All SNMP metrics are changed.')
+ return True
def test_ceilometer_node_sends_data(
+ 'to interval...')
for metric in plugin_metrics:
logger.debug('{0} {1} {2} ... '.format(metric[0], metric[1], metric[2]))
- if metric[3] - metric[2] != interval:
+ # When there's a small interval or many metrics, there may be a slight
+ # delay in writing the metrics. e.g. a gap of 1.* is okay for an interval of 1
+ if math.floor(metric[3] - metric[2]) > interval + 1:
logger.error(
'Time of last two entries differ by '
+ '{}, but interval is {}'.format(
logger.info('OK')
return True
+
+
+def test_dma_server_set_collectd(compute, file, logger, client):
+ with open(file, mode='w') as f:
+ f.write('# dummy conf\n')
+ res = client.set(file)
+ if res:
+ logger.info('set collectd PASS')
+ else:
+ logger.error('set collectd FAIL')
+
+ return res