Support using existing private key when using external heat template file
[yardstick.git] / yardstick / tests / unit / common / test_ansible_common.py
index 07befa6..bf82f62 100644 (file)
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-from __future__ import absolute_import
-
-import os
+import collections
+import shutil
+import subprocess
 import tempfile
-from collections import defaultdict
 
 import mock
-import unittest
-
-from six.moves.configparser import ConfigParser
-from six.moves import StringIO
+from six import moves
+from six.moves import configparser
 
 from yardstick.common import ansible_common
+from yardstick.tests.unit import base as ut_base
 
-PREFIX = 'yardstick.common.ansible_common'
 
+class OverwriteDictTestCase(ut_base.BaseUnitTestCase):
 
-class OverwriteDictTestCase(unittest.TestCase):
     def test_overwrite_dict_cfg(self):
-        c = ConfigParser(allow_no_value=True)
+        c = configparser.ConfigParser(allow_no_value=True)
         d = {
             "section_a": "empty_value",
             "section_b": {"key_c": "Val_d", "key_d": "VAL_D"},
@@ -42,86 +38,78 @@ class OverwriteDictTestCase(unittest.TestCase):
         # Python3 and Python2 convert empty values into None or ''
         # we don't really care but we need to compare correctly for unittest
         self.assertTrue(c.has_option("section_a", "empty_value"))
-        self.assertEqual(sorted(c.items("section_b")), [('key_c', 'Val_d'), ('key_d', 'VAL_D')])
+        self.assertEqual(sorted(c.items("section_b")),
+                         [('key_c', 'Val_d'), ('key_d', 'VAL_D')])
         self.assertTrue(c.has_option("section_c", "key_c"))
         self.assertTrue(c.has_option("section_c", "key_d"))
 
 
-class FilenameGeneratorTestCase(unittest.TestCase):
-    @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
+class FilenameGeneratorTestCase(ut_base.BaseUnitTestCase):
+
+    @mock.patch.object(tempfile, 'NamedTemporaryFile')
     def test__handle_existing_file(self, _):
-        ansible_common.FileNameGenerator._handle_existing_file("/dev/null")
+        ansible_common.FileNameGenerator._handle_existing_file('/dev/null')
 
     def test_get_generator_from_file(self):
-        ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "", "")
+        ansible_common.FileNameGenerator.get_generator_from_filename(
+            '/dev/null', '', '', '')
 
     def test_get_generator_from_file_middle(self):
-        ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "",
-                                                                     "null")
+        ansible_common.FileNameGenerator.get_generator_from_filename(
+            '/dev/null', '', '', 'null')
 
     def test_get_generator_from_file_prefix(self):
-        ansible_common.FileNameGenerator.get_generator_from_filename("/dev/null", "", "null",
-                                                                     "middle")
+        ansible_common.FileNameGenerator.get_generator_from_filename(
+            '/dev/null', '', 'null', 'middle')
 
 
-class AnsibleNodeTestCase(unittest.TestCase):
-    def test_ansible_node(self):
-        ansible_common.AnsibleNode()
+class AnsibleNodeTestCase(ut_base.BaseUnitTestCase):
 
     def test_ansible_node_len(self):
-        a = ansible_common.AnsibleNode()
-        len(a)
+        self.assertEqual(0, len(ansible_common.AnsibleNode()))
 
     def test_ansible_node_repr(self):
-        a = ansible_common.AnsibleNode()
-        repr(a)
+        self.assertEqual('AnsibleNode<{}>', repr(ansible_common.AnsibleNode()))
 
     def test_ansible_node_iter(self):
-        a = ansible_common.AnsibleNode()
-        for _ in a:
-            pass
+        node = ansible_common.AnsibleNode(data={'a': 1, 'b': 2, 'c': 3})
+        for key in node:
+            self.assertIn(key, ('a', 'b', 'c'))
 
     def test_is_role(self):
-        a = ansible_common.AnsibleNode()
-        self.assertFalse(a.is_role("", default="foo"))
+        node = ansible_common.AnsibleNode()
+        self.assertFalse(node.is_role('', default='foo'))
 
     def test_ansible_node_get_tuple(self):
-        a = ansible_common.AnsibleNode({"name": "name"})
-        self.assertEqual(a.get_tuple(), ('name', a))
+        node = ansible_common.AnsibleNode({'name': 'name'})
+        self.assertEqual(node.get_tuple(), ('name', node))
 
     def test_gen_inventory_line(self):
-        a = ansible_common.AnsibleNode(defaultdict(str))
+        a = ansible_common.AnsibleNode(collections.defaultdict(str))
         self.assertEqual(a.gen_inventory_line(), "")
 
     def test_ansible_node_delitem(self):
-        a = ansible_common.AnsibleNode({"name": "name"})
-        del a['name']
+        node = ansible_common.AnsibleNode({'name': 'name'})
+        self.assertEqual(1, len(node))
+        del node['name']
+        self.assertEqual(0, len(node))
 
     def test_ansible_node_getattr(self):
-        a = ansible_common.AnsibleNode({"name": "name"})
-        self.assertEqual(getattr(a, "nosuch", None), None)
+        node = ansible_common.AnsibleNode({'name': 'name'})
+        self.assertIsNone(getattr(node, 'nosuch', None))
 
 
-class AnsibleNodeDictTestCase(unittest.TestCase):
-    def test_ansible_node_dict(self):
-        n = ansible_common.AnsibleNode
-        ansible_common.AnsibleNodeDict(n, {})
+class AnsibleNodeDictTestCase(ut_base.BaseUnitTestCase):
 
     def test_ansible_node_dict_len(self):
         n = ansible_common.AnsibleNode
         a = ansible_common.AnsibleNodeDict(n, {})
-        len(a)
+        self.assertEqual(0, len(a))
 
     def test_ansible_node_dict_repr(self):
         n = ansible_common.AnsibleNode
         a = ansible_common.AnsibleNodeDict(n, {})
-        repr(a)
-
-    def test_ansible_node_dict_iter(self):
-        n = ansible_common.AnsibleNode
-        a = ansible_common.AnsibleNodeDict(n, {})
-        for _ in a:
-            pass
+        self.assertEqual('{}', repr(a))
 
     def test_ansible_node_dict_get(self):
         n = ansible_common.AnsibleNode
@@ -143,12 +131,15 @@ class AnsibleNodeDictTestCase(unittest.TestCase):
                          ["name ansible_ssh_pass=PASS ansible_user=user"])
 
 
-class AnsibleCommonTestCase(unittest.TestCase):
-    def test_get_timeouts(self):
-        self.assertAlmostEqual(ansible_common.AnsibleCommon.get_timeout(-100), 1200.0)
+class AnsibleCommonTestCase(ut_base.BaseUnitTestCase):
+
+    @staticmethod
+    def _delete_tmpdir(dir):
+        shutil.rmtree(dir)
 
-    def test__init__(self):
-        ansible_common.AnsibleCommon({})
+    def test_get_timeouts(self):
+        self.assertAlmostEqual(
+            ansible_common.AnsibleCommon.get_timeout(-100), 1200.0)
 
     def test_reset(self):
         a = ansible_common.AnsibleCommon({})
@@ -183,66 +174,68 @@ class AnsibleCommonTestCase(unittest.TestCase):
         a.deploy_dir = "d"
         self.assertEqual(a.deploy_dir, "d")
 
-    @mock.patch('{}.open'.format(PREFIX))
-    def test__gen_ansible_playbook_file_list(self, _):
+    @mock.patch.object(moves.builtins, 'open')
+    def test__gen_ansible_playbook_file_list(self, *args):
         d = tempfile.mkdtemp()
-        try:
-            a = ansible_common.AnsibleCommon({})
-            a._gen_ansible_playbook_file(["a"], d)
-        finally:
-            os.rmdir(d)
-
-    @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
-    @mock.patch('{}.open'.format(PREFIX))
-    def test__gen_ansible_inventory_file(self, _, __):
+        self.addCleanup(self._delete_tmpdir, d)
+        a = ansible_common.AnsibleCommon({})
+        a._gen_ansible_playbook_file(["a"], d)
+
+    @mock.patch.object(tempfile, 'NamedTemporaryFile')
+    @mock.patch.object(moves.builtins, 'open')
+    def test__gen_ansible_inventory_file(self, *args):
         nodes = [{
             "name": "name", "user": "user", "password": "PASS",
             "role": "role",
         }]
         d = tempfile.mkdtemp()
-        try:
-            a = ansible_common.AnsibleCommon(nodes)
-            a.gen_inventory_ini_dict()
-            inv_context = a._gen_ansible_inventory_file(d)
-            with inv_context:
-                c = StringIO()
-                inv_context.write_func(c)
-                self.assertIn("ansible_ssh_pass=PASS", c.getvalue())
-        finally:
-            os.rmdir(d)
-
-    @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
-    @mock.patch('{}.open'.format(PREFIX))
-    def test__gen_ansible_playbook_file_list_multiple(self, _, __):
+        self.addCleanup(self._delete_tmpdir, d)
+        a = ansible_common.AnsibleCommon(nodes)
+        a.gen_inventory_ini_dict()
+        inv_context = a._gen_ansible_inventory_file(d)
+        with inv_context:
+            c = moves.StringIO()
+            inv_context.write_func(c)
+            self.assertIn("ansible_ssh_pass=PASS", c.getvalue())
+
+    @mock.patch.object(tempfile, 'NamedTemporaryFile')
+    @mock.patch.object(moves.builtins, 'open')
+    def test__gen_ansible_playbook_file_list_multiple(self, *args):
         d = tempfile.mkdtemp()
-        try:
-            a = ansible_common.AnsibleCommon({})
-            a._gen_ansible_playbook_file(["a", "b"], d)
-        finally:
-            os.rmdir(d)
-
-    @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
-    @mock.patch('{}.Popen'.format(PREFIX))
-    @mock.patch('{}.open'.format(PREFIX))
-    def test_do_install_tmp_dir(self, _, mock_popen, __):
+        self.addCleanup(self._delete_tmpdir, d)
+        a = ansible_common.AnsibleCommon({})
+        a._gen_ansible_playbook_file(["a", "b"], d)
+
+    @mock.patch.object(tempfile, 'NamedTemporaryFile')
+    @mock.patch.object(subprocess, 'Popen')
+    @mock.patch.object(moves.builtins, 'open')
+    def test_do_install_tmp_dir(self, _, mock_popen, *args):
         mock_popen.return_value.communicate.return_value = "", ""
         mock_popen.return_value.wait.return_value = 0
         d = tempfile.mkdtemp()
-        try:
-            a = ansible_common.AnsibleCommon({})
-            a.do_install('', d)
-        finally:
-            os.rmdir(d)
-
-    @mock.patch('{}.NamedTemporaryFile'.format(PREFIX))
-    @mock.patch('{}.Popen'.format(PREFIX))
-    @mock.patch('{}.open'.format(PREFIX))
-    def test_execute_ansible_check(self, _, mock_popen, __):
+        self.addCleanup(self._delete_tmpdir, d)
+        a = ansible_common.AnsibleCommon({})
+        a.do_install('', d)
+
+    @mock.patch.object(tempfile, 'NamedTemporaryFile')
+    @mock.patch.object(moves.builtins, 'open')
+    @mock.patch.object(subprocess, 'Popen')
+    def test_execute_ansible_check(self, mock_popen, *args):
         mock_popen.return_value.communicate.return_value = "", ""
         mock_popen.return_value.wait.return_value = 0
         d = tempfile.mkdtemp()
-        try:
-            a = ansible_common.AnsibleCommon({})
-            a.execute_ansible('', d, ansible_check=True, verbose=True)
-        finally:
-            os.rmdir(d)
+        self.addCleanup(self._delete_tmpdir, d)
+        a = ansible_common.AnsibleCommon({})
+        a.execute_ansible('', d, ansible_check=True, verbose=True)
+
+    def test_get_sut_info(self):
+        d = tempfile.mkdtemp()
+        a = ansible_common.AnsibleCommon({})
+        self.addCleanup(self._delete_tmpdir, d)
+        with mock.patch.object(a, '_exec_get_sut_info_cmd'):
+            a.get_sut_info(d)
+
+    def test_get_sut_info_not_exist(self):
+        a = ansible_common.AnsibleCommon({})
+        with self.assertRaises(OSError):
+            a.get_sut_info('/hello/world')