add unittest framework for supporting unittest in testAPI 35/14235/1
authorSerenaFeng <feng.xiaowei@zte.com.cn>
Tue, 17 May 2016 13:05:27 +0000 (21:05 +0800)
committerSerenaFeng <feng.xiaowei@zte.com.cn>
Tue, 17 May 2016 13:12:20 +0000 (21:12 +0800)
usage is shown in utils/test/result_collection_api/README.md

JIRA: FUNCTEST-251

Change-Id: I788417e296c153cc485f4a4064697bdafc394e5b
Signed-off-by: SerenaFeng <feng.xiaowei@zte.com.cn>
utils/test/result_collection_api/README.md [new file with mode: 0644]
utils/test/result_collection_api/run_test.sh [new file with mode: 0644]
utils/test/result_collection_api/tests/__init__.py [new file with mode: 0644]
utils/test/result_collection_api/tests/unit/__init__.py [new file with mode: 0644]
utils/test/result_collection_api/tests/unit/fake_pymongo.py [new file with mode: 0644]
utils/test/result_collection_api/tests/unit/test_base.py [new file with mode: 0644]
utils/test/result_collection_api/tests/unit/test_fake_pymongo.py [new file with mode: 0644]
utils/test/result_collection_api/tests/unit/test_version.py [new file with mode: 0644]

diff --git a/utils/test/result_collection_api/README.md b/utils/test/result_collection_api/README.md
new file mode 100644 (file)
index 0000000..d73274c
--- /dev/null
@@ -0,0 +1,16 @@
+# result_collection_api
+
+## prepare:
+Install:
+
+```
+pip install testtools
+pip install discover
+```
+
+## How to:
+run_test.sh:
+
+```
+bash ./run_test.sh
+```
diff --git a/utils/test/result_collection_api/run_test.sh b/utils/test/result_collection_api/run_test.sh
new file mode 100644 (file)
index 0000000..6006fcf
--- /dev/null
@@ -0,0 +1,10 @@
+#! /bin/bash
+
+# Before run this script, make sure that testtools and discover
+# had been installed in your env
+# or else using pip to install them as follows:
+# pip install testtools, discover
+
+find . -type f -name "*.pyc" -delete
+testrargs="discover ./tests/unit"
+python -m testtools.run $testrargs
\ No newline at end of file
diff --git a/utils/test/result_collection_api/tests/__init__.py b/utils/test/result_collection_api/tests/__init__.py
new file mode 100644 (file)
index 0000000..3ed9fd0
--- /dev/null
@@ -0,0 +1 @@
+__author__ = 'root'
diff --git a/utils/test/result_collection_api/tests/unit/__init__.py b/utils/test/result_collection_api/tests/unit/__init__.py
new file mode 100644 (file)
index 0000000..3ed9fd0
--- /dev/null
@@ -0,0 +1 @@
+__author__ = 'root'
diff --git a/utils/test/result_collection_api/tests/unit/fake_pymongo.py b/utils/test/result_collection_api/tests/unit/fake_pymongo.py
new file mode 100644 (file)
index 0000000..e2db460
--- /dev/null
@@ -0,0 +1,129 @@
+from bson.objectid import ObjectId
+from concurrent.futures import ThreadPoolExecutor
+
+__author__ = 'serena'
+
+
+class MemCursor(object):
+    def __init__(self, collection):
+        self.collection = collection
+        self.count = len(self.collection)
+
+    def _is_next_exist(self):
+        return self.count != 0
+
+    @property
+    def fetch_next(self):
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            result = executor.submit(self._is_next_exist)
+        return result
+
+    def next_object(self):
+        self.count -= 1
+        return self.collection.pop()
+
+
+class MemDb(object):
+
+    def __init__(self):
+        self.contents = []
+        pass
+
+    def _find_one(self, spec_or_id=None, *args):
+        if spec_or_id is not None and not isinstance(spec_or_id, dict):
+            spec_or_id = {"_id": spec_or_id}
+        cursor = self._find(spec_or_id, *args)
+        for result in cursor:
+            return result
+        return None
+
+    def find_one(self, spec_or_id=None, *args):
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            result = executor.submit(self._find_one, spec_or_id, *args)
+        return result
+
+    def _insert(self, doc_or_docs):
+
+        docs = doc_or_docs
+        return_one = False
+        if isinstance(docs, dict):
+            return_one = True
+            docs = [docs]
+
+        ids = []
+        for doc in docs:
+            if '_id' not in doc:
+                doc['_id'] = ObjectId()
+            if not self._find_one(doc['_id']):
+                ids.append(doc['_id'])
+                self.contents.append(doc_or_docs)
+
+        if len(ids) == 0:
+            return None
+        if return_one:
+            return ids[0]
+        else:
+            return ids
+
+    def insert(self, doc_or_docs):
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            result = executor.submit(self._insert, doc_or_docs)
+        return result
+
+    @staticmethod
+    def _in(content, *args):
+        for arg in args:
+            for k, v in arg.iteritems():
+                if content.get(k, None) != v:
+                    return False
+
+        return True
+
+    def _find(self, *args):
+        res = []
+        for content in self.contents:
+            if self._in(content, *args):
+                res.append(content)
+
+        return res
+
+    def find(self, *args):
+        return MemCursor(self._find(*args))
+
+    def _update(self, spec, document):
+        updated = False
+        for index in range(len(self.contents)):
+            content = self.contents[index]
+            if self._in(content, spec):
+                for k, v in document.iteritems():
+                    updated = True
+                    content[k] = v
+            self.contents[index] = content
+        return updated
+
+    def update(self, spec, document):
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            result = executor.submit(self._update, spec, document)
+        return result
+
+    def _remove(self, spec_or_id=None):
+        if spec_or_id is None:
+            self.contents = []
+        if not isinstance(spec_or_id, dict):
+            spec_or_id = {'_id': spec_or_id}
+        for index in range(len(self.contents)):
+            content = self.contents[index]
+            if self._in(content, spec_or_id):
+                del self.contents[index]
+                return True
+        return False
+
+    def remove(self, spec_or_id=None):
+        with ThreadPoolExecutor(max_workers=2) as executor:
+            result = executor.submit(self._remove, spec_or_id)
+        return result
+
+pod = MemDb()
+test_projects = MemDb()
+test_cases = MemDb()
+test_results = MemDb()
diff --git a/utils/test/result_collection_api/tests/unit/test_base.py b/utils/test/result_collection_api/tests/unit/test_base.py
new file mode 100644 (file)
index 0000000..b72436e
--- /dev/null
@@ -0,0 +1,36 @@
+from tornado.web import Application
+from tornado.testing import AsyncHTTPTestCase
+
+from resources.handlers import VersionHandler, PodHandler, \
+    TestProjectHandler, TestCasesHandler, TestResultsHandler, DashboardHandler
+import fake_pymongo
+
+
+class TestBase(AsyncHTTPTestCase):
+    def get_app(self):
+        return Application(
+            [
+                (r"/version", VersionHandler),
+                (r"/pods", PodHandler),
+                (r"/pods/([^/]+)", PodHandler),
+                (r"/test_projects", TestProjectHandler),
+                (r"/test_projects/([^/]+)", TestProjectHandler),
+                (r"/test_projects/([^/]+)/cases", TestCasesHandler),
+                (r"/test_projects/([^/]+)/cases/([^/]+)", TestCasesHandler),
+                (r"/results", TestResultsHandler),
+                (r"/results([^/]*)", TestResultsHandler),
+                (r"/results/([^/]*)", TestResultsHandler),
+                (r"/dashboard", DashboardHandler),
+                (r"/dashboard([^/]*)", DashboardHandler),
+                (r"/dashboard/([^/]*)", DashboardHandler),
+            ],
+            db=fake_pymongo,
+            debug=True,
+        )
+
+    def tearDown(self):
+        yield fake_pymongo.pod.remove()
+        yield fake_pymongo.test_projects.remove()
+        yield fake_pymongo.test_cases.remove()
+        yield fake_pymongo.test_results.remove()
+        super(TestBase, self).tearDown()
diff --git a/utils/test/result_collection_api/tests/unit/test_fake_pymongo.py b/utils/test/result_collection_api/tests/unit/test_fake_pymongo.py
new file mode 100644 (file)
index 0000000..5ddbf28
--- /dev/null
@@ -0,0 +1,52 @@
+import unittest
+from tornado.web import Application
+from tornado import gen
+from tornado.testing import AsyncHTTPTestCase, gen_test
+
+import fake_pymongo
+
+
+class MyTest(AsyncHTTPTestCase):
+    def setUp(self):
+        super(MyTest, self).setUp()
+        self.db = fake_pymongo
+        self.io_loop.run_sync(self.fixture_setup)
+
+    def get_app(self):
+        return Application()
+
+    @gen.coroutine
+    def fixture_setup(self):
+        self.test1 = {'_id': '1', 'name': 'test1'}
+        self.test2 = {'name': 'test2'}
+        yield self.db.pod.insert({'_id': '1', 'name': 'test1'})
+        yield self.db.pod.insert({'name': 'test2'})
+
+    @gen_test
+    def test_find_one(self):
+        user = yield self.db.pod.find_one({'name': 'test1'})
+        self.assertEqual(user, self.test1)
+
+    @gen_test
+    def test_find(self):
+        cursor = self.db.pod.find()
+        names = []
+        while (yield cursor.fetch_next):
+            ob = cursor.next_object()
+            names.append(ob.get('name'))
+        self.assertItemsEqual(names, ['test1', 'test2'])
+
+    @gen_test
+    def test_update(self):
+        yield self.db.pod.update({'_id': '1'}, {'name': 'new_test1'})
+        user = yield self.db.pod.find_one({'_id': '1'})
+        self.assertEqual(user.get('name', None), 'new_test1')
+
+    @gen_test
+    def test_remove(self):
+        yield self.db.pod.remove({'_id': '1'})
+        user = yield self.db.pod.find_one({'_id': '1'})
+        self.assertIsNone(user)
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/utils/test/result_collection_api/tests/unit/test_version.py b/utils/test/result_collection_api/tests/unit/test_version.py
new file mode 100644 (file)
index 0000000..918f2f0
--- /dev/null
@@ -0,0 +1,14 @@
+import unittest
+
+from test_base import TestBase
+
+__author__ = 'serena'
+
+
+class TestVersion(TestBase):
+    def test_get_version(self):
+        response = self.fetch('/version')
+        self.assertEqual(response.code, 200)
+
+if __name__ == '__main__':
+    unittest.main()