Add API to get the status of async task 79/26679/9
authorchenjiankun <chenjiankun1@huawei.com>
Wed, 4 Jan 2017 17:41:18 +0000 (17:41 +0000)
committerchenjiankun <chenjiankun1@huawei.com>
Tue, 10 Jan 2017 10:03:24 +0000 (10:03 +0000)
JIRA: YARDSTICK-526

Currently there are many API run a task using sub thread.
But we don't know the status of this task.
So we need to offer a API to query the status of this task.

Change-Id: I8d2cc558750bf9270aed4a7abb8bf35d17894d83
Signed-off-by: chenjiankun <chenjiankun1@huawei.com>
14 files changed:
api/database/__init__.py
api/database/handler.py [new file with mode: 0644]
api/database/models.py
api/resources/asynctask.py [new file with mode: 0644]
api/resources/env_action.py
api/server.py
api/urls.py
api/views.py
tests/unit/cmd/commands/__init__.py [new file with mode: 0644]
tests/unit/cmd/commands/test_env.py
tests/unit/common/test_httpClient.py
yardstick/cmd/commands/env.py
yardstick/common/constants.py
yardstick/common/httpClient.py

index bc2708b..5b0bb05 100644 (file)
@@ -21,9 +21,3 @@ db_session = scoped_session(sessionmaker(autocommit=False,
                                          bind=engine))
 Base = declarative_base()
 Base.query = db_session.query_property()
-
-
-def init_db():
-    subclasses = [subclass.__name__ for subclass in Base.__subclasses__()]
-    logger.debug('Import models: %s', subclasses)
-    Base.metadata.create_all(bind=engine)
diff --git a/api/database/handler.py b/api/database/handler.py
new file mode 100644 (file)
index 0000000..f6a2257
--- /dev/null
@@ -0,0 +1,30 @@
+# ############################################################################
+# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+# ############################################################################
+from api.database import db_session
+from api.database.models import AsyncTasks
+
+
+class AsyncTaskHandler(object):
+    def insert(self, kwargs):
+        task = AsyncTasks(**kwargs)
+        db_session.add(task)
+        db_session.commit()
+        return task
+
+    def update_status(self, task, status):
+        task.status = status
+        db_session.commit()
+
+    def update_error(self, task, error):
+        task.error = error
+        db_session.commit()
+
+    def get_task_by_taskid(self, task_id):
+        task = AsyncTasks.query.filter_by(task_id=task_id).first()
+        return task
index 25e3238..2fc141c 100644 (file)
@@ -23,3 +23,14 @@ class Tasks(Base):
 
     def __repr__(self):
         return '<Task %r>' % Tasks.task_id
+
+
+class AsyncTasks(Base):
+    __tablename__ = 'asynctasks'
+    id = Column(Integer, primary_key=True)
+    task_id = Column(String(30))
+    status = Column(Integer)
+    error = Column(String(120))
+
+    def __repr__(self):
+        return '<Task %r>' % AsyncTasks.task_id
diff --git a/api/resources/asynctask.py b/api/resources/asynctask.py
new file mode 100644 (file)
index 0000000..dd2a710
--- /dev/null
@@ -0,0 +1,35 @@
+# ############################################################################
+# Copyright (c) 2017 Huawei Technologies Co.,Ltd and others.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+# ############################################################################
+import uuid
+
+from api.utils import common as common_utils
+from api.database.models import AsyncTasks
+
+
+def default(args):
+    return _get_status(args)
+
+
+def _get_status(args):
+    try:
+        task_id = args['task_id']
+        uuid.UUID(task_id)
+    except KeyError:
+        message = 'measurement and task_id must be provided'
+        return common_utils.error_handler(message)
+
+    asynctask = AsyncTasks.query.filter_by(task_id=task_id).first()
+
+    try:
+        status = asynctask.status
+        error = asynctask.error if asynctask.error else []
+
+        return common_utils.result_handler(status, error)
+    except AttributeError:
+        return common_utils.error_handler('no such task')
index 59a1692..7e24871 100644 (file)
@@ -10,6 +10,7 @@ import logging
 import threading
 import subprocess
 import time
+import uuid
 import json
 import os
 import errno
@@ -23,17 +24,24 @@ from yardstick.common.httpClient import HttpClient
 from api import conf as api_conf
 from api.utils import influx
 from api.utils.common import result_handler
+from api.database.handler import AsyncTaskHandler
 
 logger = logging.getLogger(__name__)
+logger.setLevel(logging.DEBUG)
 
 
 def createGrafanaContainer(args):
-    thread = threading.Thread(target=_create_grafana)
+    task_id = str(uuid.uuid4())
+
+    thread = threading.Thread(target=_create_grafana, args=(task_id,))
     thread.start()
-    return result_handler('success', [])
 
+    return result_handler('success', {'task_id': task_id})
+
+
+def _create_grafana(task_id):
+    _create_task(task_id)
 
-def _create_grafana():
     client = Client(base_url=config.DOCKER_URL)
 
     try:
@@ -48,7 +56,10 @@ def _create_grafana():
         _create_data_source()
 
         _create_dashboard()
+
+        _update_task_status(task_id)
     except Exception as e:
+        _update_task_error(task_id, str(e))
         logger.debug('Error: %s', e)
 
 
@@ -96,12 +107,17 @@ def _check_image_exist(client, t):
 
 
 def createInfluxDBContainer(args):
-    thread = threading.Thread(target=_create_influxdb)
+    task_id = str(uuid.uuid4())
+
+    thread = threading.Thread(target=_create_influxdb, args=(task_id,))
     thread.start()
-    return result_handler('success', [])
+
+    return result_handler('success', {'task_id': task_id})
 
 
-def _create_influxdb():
+def _create_influxdb(task_id):
+    _create_task(task_id)
+
     client = Client(base_url=config.DOCKER_URL)
 
     try:
@@ -116,7 +132,10 @@ def _create_influxdb():
         time.sleep(5)
 
         _config_influxdb()
+
+        _update_task_status(task_id)
     except Exception as e:
+        _update_task_error(task_id, str(e))
         logger.debug('Error: %s', e)
 
 
@@ -160,34 +179,44 @@ def _change_output_to_influxdb():
 
 
 def prepareYardstickEnv(args):
-    thread = threading.Thread(target=_prepare_env_daemon)
+    task_id = str(uuid.uuid4())
+
+    thread = threading.Thread(target=_prepare_env_daemon, args=(task_id,))
     thread.start()
-    return result_handler('success', [])
 
+    return result_handler('success', {'task_id': task_id})
 
-def _prepare_env_daemon():
+
+def _prepare_env_daemon(task_id):
+    _create_task(task_id)
 
     installer_ip = os.environ.get('INSTALLER_IP', 'undefined')
     installer_type = os.environ.get('INSTALLER_TYPE', 'undefined')
 
-    _check_variables(installer_ip, installer_type)
+    try:
+        _check_variables(installer_ip, installer_type)
 
-    _create_directories()
+        _create_directories()
 
-    rc_file = config.OPENSTACK_RC_FILE
+        rc_file = config.OPENSTACK_RC_FILE
 
-    _get_remote_rc_file(rc_file, installer_ip, installer_type)
+        _get_remote_rc_file(rc_file, installer_ip, installer_type)
 
-    _source_file(rc_file)
+        _source_file(rc_file)
 
-    _append_external_network(rc_file)
+        _append_external_network(rc_file)
 
-    # update the external_network
-    _source_file(rc_file)
+        # update the external_network
+        _source_file(rc_file)
 
-    _clean_images()
+        _clean_images()
 
-    _load_images()
+        _load_images()
+
+        _update_task_status(task_id)
+    except Exception as e:
+        _update_task_error(task_id, str(e))
+        logger.debug('Error: %s', e)
 
 
 def _check_variables(installer_ip, installer_type):
@@ -257,3 +286,27 @@ def _load_images():
                          cwd=config.YARDSTICK_REPOS_DIR)
     output = p.communicate()[0]
     logger.debug('The result is: %s', output)
+
+
+def _create_task(task_id):
+    async_handler = AsyncTaskHandler()
+    task_dict = {
+        'task_id': task_id,
+        'status': 0
+    }
+    async_handler.insert(task_dict)
+
+
+def _update_task_status(task_id):
+    async_handler = AsyncTaskHandler()
+
+    task = async_handler.get_task_by_taskid(task_id)
+    async_handler.update_status(task, 1)
+
+
+def _update_task_error(task_id, error):
+    async_handler = AsyncTaskHandler()
+
+    task = async_handler.get_task_by_taskid(task_id)
+    async_handler.update_status(task, 2)
+    async_handler.update_error(task, error)
index fac821b..8cce4de 100644 (file)
@@ -7,13 +7,17 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 import logging
+from itertools import ifilter
+import inspect
 
 from flask import Flask
 from flask_restful import Api
 from flasgger import Swagger
 
-from api.database import init_db
+from api.database import Base
+from api.database import engine
 from api.database import db_session
+from api.database import models
 from api.urls import urlpatterns
 from yardstick import _init_logging
 
@@ -21,8 +25,6 @@ logger = logging.getLogger(__name__)
 
 app = Flask(__name__)
 
-init_db()
-
 Swagger(app)
 
 api = Api(app)
@@ -33,6 +35,21 @@ def shutdown_session(exception=None):
     db_session.remove()
 
 
+def init_db():
+    def func(a):
+        try:
+            if issubclass(a[1], Base):
+                return True
+        except TypeError:
+            pass
+        return False
+
+    subclses = ifilter(func, inspect.getmembers(models, inspect.isclass))
+    logger.debug('Import models: %s', [a[1] for a in subclses])
+    Base.metadata.create_all(bind=engine)
+
+
+init_db()
 reduce(lambda a, b: a.add_resource(b.resource, b.url,
                                    endpoint=b.endpoint) or a, urlpatterns, api)
 
index 0fffd12..273fb40 100644 (file)
@@ -11,6 +11,7 @@ from api.utils.common import Url
 
 
 urlpatterns = [
+    Url('/yardstick/asynctask', views.Asynctask, 'asynctask'),
     Url('/yardstick/testcases/release/action', views.ReleaseAction, 'release'),
     Url('/yardstick/testcases/samples/action', views.SamplesAction, 'samples'),
     Url('/yardstick/results', views.Results, 'results'),
index ee13b47..69ca891 100644 (file)
@@ -24,6 +24,11 @@ TestCaseActionArgsOptsModel = models.TestCaseActionArgsOptsModel
 TestCaseActionArgsOptsTaskArgModel = models.TestCaseActionArgsOptsTaskArgModel
 
 
+class Asynctask(ApiResource):
+    def get(self):
+        return self._dispatch_get()
+
+
 class ReleaseAction(ApiResource):
     @swag_from(os.getcwd() + '/swagger/docs/testcases.yaml')
     def post(self):
diff --git a/tests/unit/cmd/commands/__init__.py b/tests/unit/cmd/commands/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
index af1ab80..e85c367 100644 (file)
@@ -8,17 +8,56 @@
 ##############################################################################
 import unittest
 import mock
+import uuid
 
 from yardstick.cmd.commands.env import EnvCommand
 
 
 class EnvCommandTestCase(unittest.TestCase):
 
-    @mock.patch('yardstick.cmd.commands.env.HttpClient')
-    def test_do_influxdb(self, mock_http_client):
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._start_async_task')
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._check_status')
+    def test_do_influxdb(self, check_status_mock, start_async_task_mock):
         env = EnvCommand()
         env.do_influxdb({})
-        self.assertTrue(mock_http_client().post.called)
+        self.assertTrue(start_async_task_mock.called)
+        self.assertTrue(check_status_mock.called)
+
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._start_async_task')
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._check_status')
+    def test_do_grafana(self, check_status_mock, start_async_task_mock):
+        env = EnvCommand()
+        env.do_grafana({})
+        self.assertTrue(start_async_task_mock.called)
+        self.assertTrue(check_status_mock.called)
+
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._start_async_task')
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._check_status')
+    def test_do_prepare(self, check_status_mock, start_async_task_mock):
+        env = EnvCommand()
+        env.do_prepare({})
+        self.assertTrue(start_async_task_mock.called)
+        self.assertTrue(check_status_mock.called)
+
+    @mock.patch('yardstick.cmd.commands.env.HttpClient.post')
+    def test_start_async_task(self, post_mock):
+        data = {'action': 'createGrafanaContainer'}
+        EnvCommand()._start_async_task(data)
+        self.assertTrue(post_mock.called)
+
+    @mock.patch('yardstick.cmd.commands.env.HttpClient.get')
+    @mock.patch('yardstick.cmd.commands.env.EnvCommand._print_status')
+    def test_check_status(self, print_mock, get_mock):
+        task_id = str(uuid.uuid4())
+        get_mock.return_value = {'status': 2, 'result': 'error'}
+        status = EnvCommand()._check_status(task_id, 'hello world')
+        self.assertEqual(status, 2)
+
+    def test_print_status(self):
+        try:
+            EnvCommand()._print_status('hello', 'word')
+        except Exception as e:
+            self.assertIsInstance(e, IndexError)
 
 
 def main():
index b39dc23..94ac1c8 100644 (file)
@@ -24,6 +24,12 @@ class HttpClientTestCase(unittest.TestCase):
         mock_requests.post.assert_called_with(url, data=json.dumps(data),
                                               headers=headers)
 
+    @mock.patch('yardstick.common.httpClient.requests')
+    def test_get(self, mock_requests):
+        url = 'http://localhost:5000/hello'
+        httpClient.HttpClient().get(url)
+        mock_requests.get.assert_called_with(url)
+
 
 def main():
     unittest.main()
index 098379a..d0fc75d 100644 (file)
@@ -6,13 +6,13 @@
 # which accompanies this distribution, and is available at
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
-import logging
+from __future__ import print_function
+import time
+import os
+import sys
 
 from yardstick.common.httpClient import HttpClient
-from yardstick.common import constants
-
-logger = logging.getLogger(__name__)
-logger.setLevel(logging.DEBUG)
+from yardstick.common import constants as consts
 
 
 class EnvCommand(object):
@@ -21,19 +21,63 @@ class EnvCommand(object):
         Set of commands to prepare environment
     '''
     def do_influxdb(self, args):
-        url = constants.YARDSTICK_ENV_ACTION_API
         data = {'action': 'createInfluxDBContainer'}
-        HttpClient().post(url, data)
-        logger.debug('Now creating and configing influxdb')
+        task_id = self._start_async_task(data)
+
+        start = '* creating influxDB'
+        self._check_status(task_id, start)
 
     def do_grafana(self, args):
-        url = constants.YARDSTICK_ENV_ACTION_API
         data = {'action': 'createGrafanaContainer'}
-        HttpClient().post(url, data)
-        logger.debug('Now creating and configing grafana')
+        task_id = self._start_async_task(data)
+
+        start = '* creating grafana'
+        self._check_status(task_id, start)
 
     def do_prepare(self, args):
-        url = constants.YARDSTICK_ENV_ACTION_API
         data = {'action': 'prepareYardstickEnv'}
-        HttpClient().post(url, data)
-        logger.debug('Now preparing environment')
+        task_id = self._start_async_task(data)
+
+        start = '* preparing yardstick environment'
+        self._check_status(task_id, start)
+
+    def _start_async_task(self, data):
+        url = consts.ENV_ACTION_API
+        return HttpClient().post(url, data)['result']['task_id']
+
+    def _check_status(self, task_id, start):
+        self._print_status(start, '[]\r')
+        url = '{}?task_id={}'.format(consts.ASYNC_TASK_API, task_id)
+
+        CHECK_STATUS_RETRY = 20
+        CHECK_STATUS_DELAY = 5
+
+        for retry in xrange(CHECK_STATUS_RETRY):
+            response = HttpClient().get(url)
+            status = response['status']
+
+            if status:
+                break
+
+            # wait until the async task finished
+            time.sleep(CHECK_STATUS_DELAY * (retry + 1))
+
+        switcher = {
+            0: 'Timeout',
+            1: 'Finished',
+            2: 'Error'
+        }
+        self._print_status(start, '[{}]'.format(switcher[status]))
+        if status == 2:
+            print(response['result'])
+            sys.stdout.flush()
+        return status
+
+    def _print_status(self, s, e):
+        try:
+            columns = int(os.popen('stty size', 'r').read().split()[1])
+            word = '{}{}{}'.format(s, ' ' * (columns - len(s) - len(e)), e)
+            sys.stdout.write(word)
+            sys.stdout.flush()
+        except IndexError:
+            pass
index 705e1ad..174d39b 100644 (file)
@@ -51,4 +51,6 @@ LOAD_IMAGES_SCRIPT = 'tests/ci/load_images.sh'
 
 OPENSTACK_RC_FILE = join(YARDSTICK_CONFIG_DIR, 'openstack.creds')
 
-YARDSTICK_ENV_ACTION_API = 'http://localhost:5000/yardstick/env/action'
+BASE_URL = 'http://localhost:5000'
+ENV_ACTION_API = BASE_URL + '/yardstick/env/action'
+ASYNC_TASK_API = BASE_URL + '/yardstick/asynctask'
index ab2e9a3..6acd030 100644 (file)
@@ -28,3 +28,7 @@ class HttpClient(object):
         except Exception as e:
             logger.debug('Failed: %s', e)
             raise
+
+    def get(self, url):
+        response = requests.get(url)
+        return response.json()