Fix Job Status Report 53/41253/2
authormbeierl <mark.beierl@dell.com>
Thu, 7 Sep 2017 02:17:52 +0000 (22:17 -0400)
committermbeierl <mark.beierl@dell.com>
Thu, 7 Sep 2017 02:21:59 +0000 (22:21 -0400)
Changes the loop for creating job status and list of
job workloads to run down to a single function instead
of two loops.  Uses single function to drive both status
and job workloads to execute.

Change-Id: Ia173b8450a857d032a862d03c62bfc1b248583da
JIRA: STORPERF-186
Signed-off-by: mbeierl <mark.beierl@dell.com>
docker/storperf-master/storperf/test_executor.py

index 0e3fce0..4c2c972 100644 (file)
@@ -235,23 +235,88 @@ class TestExecutor(object):
         self.start_time = time.time()
 
         self.workload_status = {}
-        # Prepare stats list
-        for workload_module in self.workload_modules:
-            workload_name = getattr(workload_module, "__name__")
-            blocksizes = self._block_sizes
-            iodepths = self._queue_depths
-            for blocksize in blocksizes:
-                for iodepth in iodepths:
-                    name = '%s.%s.queue-depth.%s.block-size.%s' % \
-                        (self.job_db.job_id, workload_name, iodepth, blocksize)
-                    self.workload_status[name] = "Pending"
+
+        workloads = self._create_workload_matrix()
+
+        for current_workload in workloads:
+            workload = current_workload['workload']
+            self._thread_gate = ThreadGate(len(self.slaves),
+                                           workload.options['status-interval'])
+
+            if self._terminated:
+                return
+            self.current_workload = current_workload['name']
+
+            self.logger.info("Starting run %s" % self.current_workload)
+            self.workload_status[self.current_workload] = "Running"
+
+            scheduler = sched.scheduler(time.time, time.sleep)
+            if self.deadline is not None \
+                    and not current_workload['workload_name'].startswith("_"):
+                event = scheduler.enter(self.deadline * 60, 1,
+                                        self.terminate_current_run,
+                                        ())
+                t = Thread(target=scheduler.run, args=())
+                t.start()
+
+            workload.options['iodepth'] = str(current_workload['queue-depth'])
+            workload.options['bs'] = str(current_workload['blocksize'])
+
+            slave_threads = []
+            for slave in self.slaves:
+                slave_workload = copy.copy(current_workload['workload'])
+                slave_workload.remote_host = slave
+
+                self._workload_executors.append(slave_workload)
+
+                t = Thread(target=self.execute_on_node,
+                           args=(slave_workload,),
+                           name="%s worker" % slave)
+                t.daemon = False
+                t.start()
+                slave_threads.append(t)
+
+            for slave_thread in slave_threads:
+                self.logger.debug("Waiting on %s" % slave_thread)
+                slave_thread.join()
+                self.logger.debug("Done waiting for %s" % slave_thread)
+
+            if not scheduler.empty():
+                try:
+                    scheduler.cancel(event)
+                except ValueError:
+                    pass
+
+            self.logger.info("Completed run %s"
+                             % self.current_workload)
+            self.workload_status[self.current_workload] = "Completed"
+            self._workload_executors = []
+            self.current_workload = None
+
+        self.logger.info("Completed job %s" % (self.job_db.job_id))
+
+        self.end_time = time.time()
+        self._terminated = True
+        self.broadcast_event()
+        self.unregister(data_handler.data_event)
+        report = {'report': json.dumps(self.metadata)}
+        self.job_db.record_workload_params(report)
+        self.job_db.job_id = None
+        if self.result_url is not None:
+            self.logger.info("Results can be found at %s" % self.result_url)
+
+    def _create_workload_matrix(self):
+        workloads = []
 
         for workload_module in self.workload_modules:
             workload_name = getattr(workload_module, "__name__")
-            self.logger.info("Starting workload %s" % (workload_name))
 
             constructorMethod = getattr(workload_module, workload_name)
             workload = constructorMethod()
+            if (self.filename is not None):
+                workload.filename = self.filename
+            workload.id = self.job_db.job_id
+
             if (self.filename is not None):
                 workload.filename = self.filename
 
@@ -262,81 +327,25 @@ class TestExecutor(object):
                 iodepths = self._queue_depths
                 blocksizes = self._block_sizes
 
-            workload.id = self.job_db.job_id
-            self._thread_gate = ThreadGate(len(self.slaves),
-                                           workload.options['status-interval'])
-
             for blocksize in blocksizes:
                 for iodepth in iodepths:
 
-                    if self._terminated:
-                        return
-                    self.current_workload = (
-                        "%s.%s.queue-depth.%s.block-size.%s"
-                        % (self.job_db.job_id,
-                           workload_name,
-                           iodepth,
-                           blocksize))
-
-                    self.logger.info("Starting run %s" % self.current_workload)
-                    self.workload_status[self.current_workload] = "Running"
-
-                    scheduler = sched.scheduler(time.time, time.sleep)
-                    if self.deadline is not None \
-                            and not workload_name.startswith("_"):
-                        event = scheduler.enter(self.deadline * 60, 1,
-                                                self.terminate_current_run,
-                                                ())
-                        t = Thread(target=scheduler.run, args=())
-                        t.start()
-
-                    workload.options['iodepth'] = str(iodepth)
-                    workload.options['bs'] = str(blocksize)
-
-                    slave_threads = []
-                    for slave in self.slaves:
-                        slave_workload = copy.copy(workload)
-                        slave_workload.remote_host = slave
-
-                        self._workload_executors.append(slave_workload)
-
-                        t = Thread(target=self.execute_on_node,
-                                   args=(slave_workload,),
-                                   name="%s worker" % slave)
-                        t.daemon = False
-                        t.start()
-                        slave_threads.append(t)
-
-                    for slave_thread in slave_threads:
-                        self.logger.debug("Waiting on %s" % slave_thread)
-                        slave_thread.join()
-                        self.logger.debug("Done waiting for %s" % slave_thread)
-
-                    if not scheduler.empty():
-                        try:
-                            scheduler.cancel(event)
-                        except ValueError:
-                            pass
-
-                    self.logger.info("Completed run %s"
-                                     % self.current_workload)
-                    self.workload_status[self.current_workload] = "Completed"
-                    self._workload_executors = []
-                    self.current_workload = None
-
-            self.logger.info("Completed workload %s" % (workload_name))
-        self.logger.info("Completed job %s" % (self.job_db.job_id))
+                    name = '%s.%s.queue-depth.%s.block-size.%s' % \
+                        (self.job_db.job_id, workload_name, iodepth, blocksize)
+                    self.workload_status[name] = "Pending"
 
-        if self.result_url is not None:
-            self.logger.info("Results can be found at %s" % self.result_url)
+                    parameters = {'queue-depth': iodepth,
+                                  'blocksize': blocksize,
+                                  'name': name,
+                                  'workload_name': workload_name,
+                                  'status': 'Pending',
+                                  'workload': workload}
 
-        self.end_time = time.time()
-        self._terminated = True
-        self.broadcast_event()
-        self.unregister(data_handler.data_event)
-        report = {'report': json.dumps(self.metadata)}
-        self.job_db.record_workload_params(report)
-        self.job_db.job_id = None
+                    self.logger.info("Workload %s=%s" % (name, parameters))
+
+                    workloads.append(parameters)
+
+        return workloads
 
     def execute_on_node(self, workload):