Merge "Suppress log outputs in test_ixnet_api"
[yardstick.git] / yardstick / tests / unit / benchmark / runner / test_proxduration.py
index be1715a..056195f 100644 (file)
@@ -1,23 +1,29 @@
-##############################################################################
-# Copyright (c) 2018 Nokia and others.
+# Copyright (c) 2018 Intel Corporation
 #
-# All rights reserved. This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-# http://www.apache.org/licenses/LICENSE-2.0
-##############################################################################
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 import mock
 import unittest
 import multiprocessing
 import os
-import time
 
 from yardstick.benchmark.runners import proxduration
+from yardstick.common import constants
 from yardstick.common import exceptions as y_exc
 
 
 class ProxDurationRunnerTest(unittest.TestCase):
+
     class MyMethod(object):
         SLA_VALIDATION_ERROR_SIDE_EFFECT = 1
         BROAD_EXCEPTION_SIDE_EFFECT = 2
@@ -69,38 +75,37 @@ class ProxDurationRunnerTest(unittest.TestCase):
     @mock.patch.object(os, 'getpid')
     def test__worker_process_runner_id(self, mock_os_getpid):
         mock_os_getpid.return_value = 101
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), mock.Mock())
 
-        self.assertEqual(self.scenario_cfg['runner']['runner_id'], 101)
+        self.assertEqual(101, self.scenario_cfg['runner']['runner_id'])
 
     def test__worker_process_called_with_cfg(self):
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
 
     def test__worker_process_called_with_cfg_loop(self):
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.01}
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
-        self.assertGreater(self.benchmark.my_method.call_count, 2)
+        self.assertGreater(self.benchmark.my_method.call_count, 0)
 
     def test__worker_process_called_without_cfg(self):
         scenario_cfg = {'runner': {}}
-
         aborted = multiprocessing.Event()
         aborted.set()
-
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 scenario_cfg, {}, aborted, mock.Mock())
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', scenario_cfg, {},
+            aborted, mock.Mock())
 
         self.benchmark_cls.assert_called_once_with(scenario_cfg, {})
         self.benchmark.setup.assert_called_once()
@@ -108,188 +113,174 @@ class ProxDurationRunnerTest(unittest.TestCase):
 
     def test__worker_process_output_queue(self):
         self.benchmark.my_method = mock.Mock(return_value='my_result')
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        output_queue = multiprocessing.Queue()
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), output_queue)
-        time.sleep(0.1)
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        output_queue = mock.Mock()
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), output_queue)
 
         self._assert_defaults__worker_run_setup_and_teardown()
-        self.assertEquals(output_queue.get(), 'my_result')
+        output_queue.put.assert_has_calls(
+            [mock.call('my_result', True, constants.QUEUE_PUT_TIMEOUT)])
 
     def test__worker_process_output_queue_multiple_iterations(self):
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.benchmark.my_method = self.MyMethod()
-
-        output_queue = multiprocessing.Queue()
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), output_queue)
-        time.sleep(0.1)
+        output_queue = mock.Mock()
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), output_queue)
 
         self._assert_defaults__worker_run_setup_and_teardown()
-        self.assertGreater(self.benchmark.my_method.count, 103)
-
-        count = 101
-        while not output_queue.empty():
-            count += 1
-            self.assertEquals(output_queue.get(), count)
+        for idx in range(102, 101 + len(output_queue.method_calls)):
+            output_queue.put.assert_has_calls(
+                [mock.call(idx, True, constants.QUEUE_PUT_TIMEOUT)])
 
     def test__worker_process_queue(self):
         self.benchmark.my_method = self.MyMethod()
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        queue = multiprocessing.Queue()
-        timestamp = time.time()
-        proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
-        time.sleep(0.1)
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        queue = mock.Mock()
+        proxduration._worker_process(
+            queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+            multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
-
-        result = queue.get()
-        self.assertGreater(result['timestamp'], timestamp)
-        self.assertEqual(result['errors'], '')
-        self.assertEqual(result['data'], {'my_key': 102})
-        self.assertEqual(result['sequence'], 1)
+        benchmark_output = {'timestamp': mock.ANY,
+                            'sequence': 1,
+                            'data': {'my_key': 102},
+                            'errors': ''}
+        queue.put.assert_has_calls(
+            [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
 
     def test__worker_process_queue_multiple_iterations(self):
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.benchmark.my_method = self.MyMethod()
-
-        queue = multiprocessing.Queue()
-        timestamp = time.time()
-        proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
-        time.sleep(0.1)
+        queue = mock.Mock()
+        proxduration._worker_process(
+            queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+            multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
-        self.assertGreater(self.benchmark.my_method.count, 103)
-
-        count = 0
-        while not queue.empty():
-            count += 1
-            result = queue.get()
-            self.assertGreater(result['timestamp'], timestamp)
-            self.assertEqual(result['errors'], '')
-            self.assertEqual(result['data'], {'my_key': count + 101})
-            self.assertEqual(result['sequence'], count)
+        for idx in range(102, 101 + len(queue.method_calls)):
+            benchmark_output = {'timestamp': mock.ANY,
+                                'sequence': idx - 101,
+                                'data': {'my_key': idx},
+                                'errors': ''}
+            queue.put.assert_has_calls(
+                [mock.call(benchmark_output, True,
+                           constants.QUEUE_PUT_TIMEOUT)])
 
     def test__worker_process_except_sla_validation_error_no_sla_cfg(self):
         self.benchmark.my_method = mock.Mock(
             side_effect=y_exc.SLAValidationError)
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
 
-    def test__worker_process_except_sla_validation_error_sla_cfg_monitor(self):
+    @mock.patch.object(proxduration.LOG, 'warning')
+    def test__worker_process_except_sla_validation_error_sla_cfg_monitor(
+            self, *args):
         self.scenario_cfg['sla'] = {'action': 'monitor'}
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.benchmark.my_method = mock.Mock(
             side_effect=y_exc.SLAValidationError)
-
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method', self.scenario_cfg,
+            {}, multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
 
     def test__worker_process_raise_sla_validation_error_sla_cfg_default(self):
         self.scenario_cfg['sla'] = {}
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.benchmark.my_method = mock.Mock(
             side_effect=y_exc.SLAValidationError)
-
         with self.assertRaises(y_exc.SLAValidationError):
-            proxduration._worker_process(mock.Mock(), self.benchmark_cls,
-                                     'my_method', self.scenario_cfg, {},
-                                     multiprocessing.Event(), mock.Mock())
+            proxduration._worker_process(
+                mock.Mock(), self.benchmark_cls, 'my_method',
+                self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
 
         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
         self.benchmark.setup.assert_called_once()
         self.benchmark.my_method.assert_called_once_with({})
 
     def test__worker_process_raise_sla_validation_error_sla_cfg_assert(self):
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.scenario_cfg['sla'] = {'action': 'assert'}
         self.benchmark.my_method = mock.Mock(
             side_effect=y_exc.SLAValidationError)
 
         with self.assertRaises(y_exc.SLAValidationError):
-            proxduration._worker_process(mock.Mock(), self.benchmark_cls,
-                                     'my_method', self.scenario_cfg, {},
-                                     multiprocessing.Event(), mock.Mock())
+            proxduration._worker_process(
+                mock.Mock(), self.benchmark_cls, 'my_method',
+                self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
 
         self.benchmark_cls.assert_called_once_with(self.scenario_cfg, {})
         self.benchmark.setup.assert_called_once()
         self.benchmark.my_method.assert_called_once_with({})
 
-    def test__worker_process_queue_on_sla_validation_error_monitor(self):
+    @mock.patch.object(proxduration.LOG, 'warning')
+    def test__worker_process_queue_on_sla_validation_error_monitor(
+            self, *args):
         self.scenario_cfg['sla'] = {'action': 'monitor'}
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
         self.benchmark.my_method = self.MyMethod(
             side_effect=self.MyMethod.SLA_VALIDATION_ERROR_SIDE_EFFECT)
-
-        queue = multiprocessing.Queue()
-        timestamp = time.time()
-        proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
-        time.sleep(0.1)
+        queue = mock.Mock()
+        proxduration._worker_process(
+            queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+            multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
-
-        result = queue.get()
-        self.assertGreater(result['timestamp'], timestamp)
-        self.assertEqual(result['errors'], ('My Case SLA validation failed. '
-                                            'Error: my error message',))
-        self.assertEqual(result['data'], {'my_key': 102})
-        self.assertEqual(result['sequence'], 1)
-
-    def test__worker_process_broad_exception(self):
+        benchmark_output = {'timestamp': mock.ANY,
+                            'sequence': 1,
+                            'data': {'my_key': 102},
+                            'errors': ('My Case SLA validation failed. '
+                                       'Error: my error message', )}
+        queue.put.assert_has_calls(
+            [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
+
+    @mock.patch.object(proxduration.LOG, 'exception')
+    def test__worker_process_broad_exception(self, *args):
         self.benchmark.my_method = mock.Mock(
             side_effect=y_exc.YardstickException)
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        proxduration._worker_process(mock.Mock(), self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        proxduration._worker_process(
+            mock.Mock(), self.benchmark_cls, 'my_method',
+            self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
 
         self._assert_defaults__worker_run_setup_and_teardown()
 
-    def test__worker_process_queue_on_broad_exception(self):
+    @mock.patch.object(proxduration.LOG, 'exception')
+    def test__worker_process_queue_on_broad_exception(self, *args):
         self.benchmark.my_method = self.MyMethod(
             side_effect=self.MyMethod.BROAD_EXCEPTION_SIDE_EFFECT)
-
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
-        queue = multiprocessing.Queue()
-        timestamp = time.time()
-        proxduration._worker_process(queue, self.benchmark_cls, 'my_method',
-                                 self.scenario_cfg, {},
-                                 multiprocessing.Event(), mock.Mock())
-        time.sleep(0.1)
-
-        self._assert_defaults__worker_run_setup_and_teardown()
-
-        result = queue.get()
-        self.assertGreater(result['timestamp'], timestamp)
-        self.assertNotEqual(result['errors'], '')
-        self.assertEqual(result['data'], {'my_key': 102})
-        self.assertEqual(result['sequence'], 1)
-
-    def test__worker_process_benchmark_teardown_on_broad_exception(self):
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
+        queue = mock.Mock()
+        proxduration._worker_process(
+            queue, self.benchmark_cls, 'my_method', self.scenario_cfg, {},
+            multiprocessing.Event(), mock.Mock())
+
+        benchmark_output = {'timestamp': mock.ANY,
+                            'sequence': 1,
+                            'data': {'my_key': 102},
+                            'errors': mock.ANY}
+        queue.put.assert_has_calls(
+            [mock.call(benchmark_output, True, constants.QUEUE_PUT_TIMEOUT)])
+
+    @mock.patch.object(proxduration.LOG, 'exception')
+    def test__worker_process_benchmark_teardown_on_broad_exception(
+            self, *args):
         self.benchmark.teardown = mock.Mock(
             side_effect=y_exc.YardstickException)
-
-        self.scenario_cfg["runner"] = {"sampled": True, "duration": 1}
+        self.scenario_cfg["runner"] = {"sampled": True, "duration": 0.1}
 
         with self.assertRaises(SystemExit) as raised:
-            proxduration._worker_process(mock.Mock(), self.benchmark_cls,
-                                     'my_method', self.scenario_cfg, {},
-                                     multiprocessing.Event(), mock.Mock())
-        self.assertEqual(raised.exception.code, 1)
+            proxduration._worker_process(
+                mock.Mock(), self.benchmark_cls, 'my_method',
+                self.scenario_cfg, {}, multiprocessing.Event(), mock.Mock())
+        self.assertEqual(1, raised.exception.code)
         self._assert_defaults__worker_run_setup_and_teardown()