StorPerf Integration 47/16347/8
authorJingLu5 <lvjing5@huawei.com>
Fri, 29 Jul 2016 06:42:45 +0000 (14:42 +0800)
committerJingLu5 <lvjing5@huawei.com>
Mon, 1 Aug 2016 01:11:40 +0000 (09:11 +0800)
Change-Id: I34a44111078efe50b1dbbaddda72474d25aafe43
Signed-off-by: JingLu5 <lvjing5@huawei.com>
samples/storperf.yaml [new file with mode: 0644]
tests/unit/benchmark/scenarios/storage/test_storperf.py [new file with mode: 0644]
yardstick/benchmark/scenarios/storage/storperf.py [new file with mode: 0644]

diff --git a/samples/storperf.yaml b/samples/storperf.yaml
new file mode 100644 (file)
index 0000000..815ef0d
--- /dev/null
@@ -0,0 +1,31 @@
+---
+# Sample StorPerf benchmark task config file
+# StorPerf is a tool to measure block and object storage performance in an NFVI
+
+schema: "yardstick:task:0.1"
+
+scenarios:
+-
+  type: StorPerf
+  options:
+    agent_count: 1
+    agent_image: "Ubuntu 14.04"
+    public_network: "ext-net"
+    volume_size: 2
+    # target:
+    # deadline:
+    # nossd:
+    # nowarm:
+    block_sizes: "4096"
+    queue_depths: "4"
+    workload: "ws"
+    StorPerf_ip: "192.168.23.2"
+    query_interval: 10
+    timeout: 600
+
+  runner:
+    type: Iteration
+    iterations: 1
+
+context:
+  type: Dummy
diff --git a/tests/unit/benchmark/scenarios/storage/test_storperf.py b/tests/unit/benchmark/scenarios/storage/test_storperf.py
new file mode 100644 (file)
index 0000000..d87ed73
--- /dev/null
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+
+##############################################################################
+# Copyright (c) 2016 Huawei Technologies Co.,Ltd.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+
+# Unittest for yardstick.benchmark.scenarios.storage.storperf.StorPerf
+
+import mock
+import unittest
+import requests
+import json
+
+from yardstick.benchmark.scenarios.storage import storperf
+
+
+def mocked_requests_config_post(*args, **kwargs):
+    class MockResponseConfigPost:
+        def __init__(self, json_data, status_code):
+            self.content = json_data
+            self.status_code = status_code
+
+    return MockResponseConfigPost('{"stack_id": "dac27db1-3502-4300-b301-91c64e6a1622","stack_created": "false"}', 200)
+
+
+def mocked_requests_config_get(*args, **kwargs):
+    class MockResponseConfigGet:
+        def __init__(self, json_data, status_code):
+            self.content = json_data
+            self.status_code = status_code
+
+    return MockResponseConfigGet('{"stack_id": "dac27db1-3502-4300-b301-91c64e6a1622","stack_created": "true"}', 200)
+
+
+def mocked_requests_job_get(*args, **kwargs):
+    class MockResponseJobGet:
+        def __init__(self, json_data, status_code):
+            self.content = json_data
+            self.status_code = status_code
+
+    return MockResponseJobGet('{"_ssd_preconditioning.queue-depth.8.block-size.16384.duration": 6}', 200)
+
+
+def mocked_requests_job_post(*args, **kwargs):
+    class MockResponseJobPost:
+        def __init__(self, json_data, status_code):
+            self.content = json_data
+            self.status_code = status_code
+
+    return MockResponseJobPost('{"job_id": \
+                                 "d46bfb8c-36f4-4a40-813b-c4b4a437f728"}', 200)
+
+
+def mocked_requests_job_delete(*args, **kwargs):
+    class MockResponseJobDelete:
+        def __init__(self, json_data, status_code):
+            self.content = json_data
+            self.status_code = status_code
+
+    return MockResponseJobDelete('{}', 200)
+
+
+def mocked_requests_delete(*args, **kwargs):
+    class MockResponseDelete:
+        def __init__(self, json_data, status_code):
+            self.json_data = json_data
+            self.status_code = status_code
+
+    return MockResponseDelete('{}', 200)
+
+
+def mocked_requests_delete_failed(*args, **kwargs):
+    class MockResponseDeleteFailed:
+        def __init__(self, json_data, status_code):
+            self.json_data = json_data
+            self.status_code = status_code
+
+    if args[0] == "http://172.16.0.137:5000/api/v1.0/configurations":
+        return MockResponseDeleteFailed('{"message": "Teardown failed"}', 400)
+
+    return MockResponseDeleteFailed('{}', 404)
+
+
+class StorPerfTestCase(unittest.TestCase):
+
+    def setUp(self):
+        self.ctx = {
+            'host': {
+                'ip': '172.16.0.137',
+                'user': 'cirros',
+                'key_filename': "mykey.key"
+            }
+        }
+
+        self.result = {}
+
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.post',
+                side_effect=mocked_requests_config_post)
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.get',
+                side_effect=mocked_requests_config_get)
+    def test_successful_setup(self, mock_post, mock_get):
+        options = {
+            "agent_count": 8,
+            "public_network": 'ext-net',
+            "volume_size": 10,
+            "block_sizes": 4096,
+            "queue_depths": 4,
+            "workload": "rs",
+            "StorPerf_ip": "192.168.23.2",
+            "query_interval": 10,
+            "timeout": 60
+        }
+
+        args = {
+            "options": options
+        }
+
+        s = storperf.StorPerf(args, self.ctx)
+
+        s.setup()
+
+        self.assertTrue(s.setup_done)
+
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.post',
+                side_effect=mocked_requests_job_post)
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.get',
+                side_effect=mocked_requests_job_get)
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.delete',
+                side_effect=mocked_requests_job_delete)
+    def test_successful_run(self, mock_post, mock_get, mock_delete):
+        options = {
+            "agent_count": 8,
+            "public_network": 'ext-net',
+            "volume_size": 10,
+            "block_sizes": 4096,
+            "queue_depths": 4,
+            "workload": "rs",
+            "StorPerf_ip": "192.168.23.2",
+            "query_interval": 10,
+            "timeout": 60
+        }
+
+        args = {
+            "options": options
+        }
+
+        s = storperf.StorPerf(args, self.ctx)
+        s.setup_done = True
+
+        sample_output = '{"_ssd_preconditioning.queue-depth.8.block-size.16384.duration": 6}'
+
+        expected_result = json.loads(sample_output)
+
+        s.run(self.result)
+
+        self.assertEqual(self.result, expected_result)
+
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.delete', side_effect=mocked_requests_delete)
+    def test_successful_teardown(self, mock_delete):
+        options = {
+            "agent_count": 8,
+            "public_network": 'ext-net',
+            "volume_size": 10,
+            "block_sizes": 4096,
+            "queue_depths": 4,
+            "workload": "rs",
+            "StorPerf_ip": "192.168.23.2",
+            "query_interval": 10,
+            "timeout": 60
+        }
+
+        args = {
+            "options": options
+        }
+
+        s = storperf.StorPerf(args, self.ctx)
+
+        s.teardown()
+
+        self.assertFalse(s.setup_done)
+
+    @mock.patch('yardstick.benchmark.scenarios.storage.storperf.requests.delete', side_effect=mocked_requests_delete_failed)
+    def test_failed_teardown(self, mock_delete):
+        options = {
+            "agent_count": 8,
+            "public_network": 'ext-net',
+            "volume_size": 10,
+            "block_sizes": 4096,
+            "queue_depths": 4,
+            "workload": "rs",
+            "StorPerf_ip": "192.168.23.2",
+            "query_interval": 10,
+            "timeout": 60
+        }
+
+        args = {
+            "options": options
+        }
+
+        s = storperf.StorPerf(args, self.ctx)
+
+        self.assertRaises(AssertionError, s.teardown(), self.result)
+
+
+def main():
+    unittest.main()
+
+if __name__ == '__main__':
+    main()
diff --git a/yardstick/benchmark/scenarios/storage/storperf.py b/yardstick/benchmark/scenarios/storage/storperf.py
new file mode 100644 (file)
index 0000000..d39c23a
--- /dev/null
@@ -0,0 +1,208 @@
+##############################################################################
+# Copyright (c) 2016 Huawei Technologies Co.,Ltd.
+#
+# All rights reserved. This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+# http://www.apache.org/licenses/LICENSE-2.0
+##############################################################################
+import logging
+import json
+import requests
+import time
+
+from yardstick.benchmark.scenarios import base
+
+LOG = logging.getLogger(__name__)
+
+
+class StorPerf(base.Scenario):
+    """Execute StorPerf benchmark.
+    Once the StorPerf container has been started and the ReST API exposed,
+    you can interact directly with it using the ReST API. StorPerf comes with a
+    Swagger interface that is accessible through the exposed port at:
+    http://StorPerf:5000/swagger/index.html
+
+  Command line options:
+    target = [device or path] (Optional):
+    The path to either an attached storage device (/dev/vdb, etc) or a
+    directory path (/opt/storperf) that will be used to execute the performance
+    test. In the case of a device, the entire device will be used.
+    If not specified, the current directory will be used.
+
+    workload = [workload module] (Optional):
+    If not specified, the default is to run all workloads.
+    The workload types are:
+        rs: 100% Read, sequential data
+        ws: 100% Write, sequential data
+        rr: 100% Read, random access
+        wr: 100% Write, random access
+        rw: 70% Read / 30% write, random access
+
+    nossd (Optional):
+    Do not perform SSD style preconditioning.
+
+    nowarm (Optional):
+    Do not perform a warmup prior to measurements.
+
+    report = [job_id] (Optional):
+    Query the status of the supplied job_id and report on metrics.
+    If a workload is supplied, will report on only that subset.
+
+    """
+    __scenario_type__ = "StorPerf"
+
+    def __init__(self, scenario_cfg, context_cfg):
+        """Scenario construction."""
+        self.scenario_cfg = scenario_cfg
+        self.context_cfg = context_cfg
+        self.options = self.scenario_cfg["options"]
+
+        self.target = self.options.get("StorPerf_ip", None)
+        self.query_interval = self.options.get("query_interval", 10)
+        # Maximum allowed job time
+        self.timeout = self.options.get('timeout', 3600)
+
+        self.setup_done = False
+        self.job_completed = False
+
+    def _query_setup_state(self):
+        """Query the stack status."""
+        LOG.info("Querying the stack state...")
+        setup_query = requests.get('http://%s:5000/api/v1.0/configurations'
+                                   % self.target)
+
+        setup_query_content = json.loads(setup_query.content)
+        if setup_query_content["stack_created"]:
+            self.setup_done = True
+            LOG.debug("stack_created: %s"
+                      % setup_query_content["stack_created"])
+
+    def setup(self):
+        """Set the configuration."""
+        env_args = {}
+        env_args_payload_list = ["agent_count", "public_network",
+                                 "agent_image", "volume_size"]
+
+        for env_argument in env_args_payload_list:
+            if env_argument in self.options:
+                env_args[env_argument] = self.options[env_argument]
+
+        LOG.info("Creating a stack on node %s with parameters %s" %
+                 (self.target, env_args))
+        setup_res = requests.post('http://%s:5000/api/v1.0/configurations'
+                                  % self.target, json=env_args)
+
+        setup_res_content = json.loads(setup_res.content)
+
+        if setup_res.status_code == 400:
+            raise RuntimeError("Failed to create a stack, error message:",
+                               setup_res_content["message"])
+        elif setup_res.status_code == 200:
+            LOG.info("stack_id: %s" % setup_res_content["stack_id"])
+
+            while not self.setup_done:
+                self._query_setup_state()
+                time.sleep(self.query_interval)
+
+    # TODO: Support Storperf job status.
+
+    # def _query_job_state(self, job_id):
+    #     """Query the status of the supplied job_id and report on metrics"""
+    #     LOG.info("Fetching report for %s..." % job_id)
+    #     report_res = requests.get('http://%s:5000/api/v1.0/jobs?id=%s' %
+    #                               (self.target, job_id))
+
+    #     report_res_content = json.loads(report_res.content)
+
+    #     if report_res.status_code == 400:
+    #         raise RuntimeError("Failed to fetch report, error message:",
+    #                            report_res_content["message"])
+    #     else:
+    #         job_status = report_res_content["status"]
+
+    #     LOG.debug("Job is: %s..." % job_status)
+    #     if job_status == "completed":
+    #         self.job_completed = True
+
+        # TODO: Support using StorPerf ReST API to read Job ETA.
+
+        # if job_status == "completed":
+        #     self.job_completed = True
+        #     ETA = 0
+        # elif job_status == "running":
+        #     ETA = report_res_content['time']
+        #
+        # return ETA
+
+    def run(self, result):
+        """Execute StorPerf benchmark"""
+        if not self.setup_done:
+            self.setup()
+
+        job_args = {}
+        job_args_payload_list = ["block_sizes", "queue_depths", "deadline",
+                                 "target", "nossd", "nowarm", "workload"]
+
+        for job_argument in job_args_payload_list:
+            if job_argument in self.options:
+                job_args[job_argument] = self.options[job_argument]
+
+        LOG.info("Starting a job with parameters %s" % job_args)
+        job_res = requests.post('http://%s:5000/api/v1.0/jobs' % self.target,
+                                json=job_args)
+
+        job_res_content = json.loads(job_res.content)
+
+        if job_res.status_code == 400:
+            raise RuntimeError("Failed to start a job, error message:",
+                               job_res_content["message"])
+        elif job_res.status_code == 200:
+            job_id = job_res_content["job_id"]
+            LOG.info("Started job id: %s..." % job_id)
+
+            time.sleep(self.timeout)
+            terminate_res = requests.delete('http://%s:5000/api/v1.0/jobs' %
+                                            self.target)
+
+            if terminate_res.status_code == 400:
+                terminate_res_content = json.loads(terminate_res.content)
+                raise RuntimeError("Failed to start a job, error message:",
+                                   terminate_res_content["message"])
+
+        # TODO: Support Storperf job status.
+
+        #   while not self.job_completed:
+        #       self._query_job_state(job_id)
+        #       time.sleep(self.query_interval)
+
+        # TODO: Support using ETA to polls for completion.
+        #       Read ETA, next poll in 1/2 ETA time slot.
+        #       If ETA is greater than the maximum allowed job time,
+        #       then terminate job immediately.
+
+        #   while not self.job_completed:
+        #       esti_time = self._query_state(job_id)
+        #       if esti_time > self.timeout:
+        #           terminate_res = requests.delete('http://%s:5000/api/v1.0
+        #                                           /jobs' % self.target)
+        #       else:
+        #           time.sleep(int(est_time)/2)
+
+            result_res = requests.get('http://%s:5000/api/v1.0/jobs?id=%s' %
+                                      (self.target, job_id))
+            result_res_content = json.loads(result_res.content)
+
+            result.update(result_res_content)
+
+    def teardown(self):
+        """Deletes the agent configuration and the stack"""
+        teardown_res = requests.delete('http://%s:5000/api/v1.0/\
+                                       configurations' % self.target)
+
+        if teardown_res.status_code == 400:
+            teardown_res_content = json.loads(teardown_res.content)
+            raise RuntimeError("Failed to reset environment, error message:",
+                               teardown_res_content['message'])
+
+        self.setup_done = False