Change to Paramiko 33/25433/2
authorMark Beierl <mark.beierl@dell.com>
Fri, 2 Dec 2016 16:19:54 +0000 (11:19 -0500)
committerMark Beierl <mark.beierl@dell.com>
Fri, 2 Dec 2016 16:40:10 +0000 (11:40 -0500)
Use Paramiko ssh client instead of invoking ssh and scp
from the command line

Change-Id: Ibc8395b98842fd7f40b49c4dafa2688d8e64abc7
JIRA: STORPERF-91
Signed-off-by: Mark Beierl <mark.beierl@dell.com>
ci/verify.sh
docker/Dockerfile
docker/requirements.pip
storperf/carbon/emitter.py
storperf/fio/fio_invoker.py
storperf/logging.json
storperf/storperf_master.py

index e87c757..22d0186 100755 (executable)
@@ -32,6 +32,7 @@ pip install html2text==2016.1.8
 pip install matplotlib==1.3.1
 pip install mock==1.3.0
 pip install nose==1.3.7
+pip install paramiko==2.0.2
 pip install python-cinderclient==1.6.0
 pip install python-glanceclient==1.1.0
 pip install python-heatclient==0.8.0
@@ -40,6 +41,7 @@ pip install python-neutronclient==2.6.0
 pip install python-novaclient==2.28.1
 pip install pyyaml==3.10
 pip install requests==2.9.1
+pip install scp==0.10.2
 pip install six==1.10.0
 
 python ci/setup.py develop
index 449ad70..d8abf63 100644 (file)
@@ -36,6 +36,8 @@ libaio-dev \
 zlib1g-dev \
 supervisor \
 ssh \
+libssl-dev \
+libffi-dev \
 rsync \
 git \
 wget \
index 8efaf24..69f4ab2 100644 (file)
@@ -11,3 +11,5 @@ flask-restful==0.3.5
 flask-restful-swagger==0.19
 flask-swagger==0.2.12
 html2text==2016.1.8
+paramiko=2.0.2
+scp==0.10.2
index 6104fd4..c9af8a6 100644 (file)
@@ -35,3 +35,5 @@ class CarbonMetricTransmitter():
             carbon_socket.send(message + '\n')
 
         carbon_socket.close()
+        self.logger.info("Sent metrics to carbon with timestamp %s"
+                         % timestamp)
index 59dbdaf..315b243 100644 (file)
@@ -11,6 +11,7 @@ import json
 import logging
 import subprocess
 from threading import Thread
+import paramiko
 
 
 class FIOInvoker(object):
@@ -37,10 +38,11 @@ class FIOInvoker(object):
     def unregister(self, event_listener):
         self.event_listeners.discard(event_listener)
 
-    def stdout_handler(self):
+    def stdout_handler(self, stdout):
+        self.logger.debug("Started")
         self.json_body = ""
         try:
-            for line in iter(self.fio_process.stdout.readline, b''):
+            for line in iter(stdout.readline, b''):
                 if line.startswith("fio"):
                     line = ""
                     continue
@@ -54,52 +56,53 @@ class FIOInvoker(object):
                             try:
                                 event_listener(self.callback_id, json_metric)
                             except Exception, e:
-                                self.logger.error("Notifying listener %s: %s",
-                                                  self.callback_id, e)
+                                self.logger.exception(
+                                    "Notifying listener %s: %s",
+                                    self.callback_id, e)
+                            self.logger.info(
+                                "Event listener callback complete")
                 except Exception, e:
                     self.logger.error("Error parsing JSON: %s", e)
         except ValueError:
             pass  # We might have read from the closed socket, ignore it
 
-        self.fio_process.stdout.close()
+        stdout.close()
+        self.logger.debug("Finished")
 
-    def stderr_handler(self):
-        for line in iter(self.fio_process.stderr.readline, b''):
-            self.logger.error("FIO Error: %s", line)
+    def stderr_handler(self, stderr):
+        self.logger.debug("Started")
+        for line in iter(stderr.readline, b''):
+            self.logger.error("FIO Error: %s", line.rstrip())
 
-        self.fio_process.stderr.close()
+        stderr.close()
+        self.logger.debug("Finished")
 
     def execute(self, args=[]):
-        self.logger.debug("FIO args " + str(args))
-
-        if (self.remote_host is None):
-            cmd = "fio"
-        else:
-            cmd = "ssh"
-            additional_args = ['-o', 'StrictHostKeyChecking=no',
-                               '-o', 'UserKnownHostsFile=/dev/null',
-                               '-o', 'LogLevel=error',
-                               '-i', 'storperf/resources/ssh/storperf_rsa',
-                               'storperf@' + self.remote_host,
-                               "sudo", "./fio"]
-            args = additional_args + args
-
-        self.fio_process = subprocess.Popen([cmd] + args,
-                                            universal_newlines=True,
-                                            stdout=subprocess.PIPE,
-                                            stderr=subprocess.PIPE)
-
-        t = Thread(target=self.stdout_handler, args=())
-        t.daemon = True
-        t.start()
-
-        t = Thread(target=self.stderr_handler, args=())
-        t.daemon = True
-        t.start()
-
-        self.logger.debug("Started fio on " + self.remote_host)
-        t.join()
-        self.logger.debug("Finished fio on " + self.remote_host)
+
+        ssh = paramiko.SSHClient()
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        ssh.connect(self.remote_host, username='storperf',
+                    key_filename='storperf/resources/ssh/storperf_rsa',
+                    timeout=2)
+
+        command = "sudo ./fio " + ' '.join(args)
+        self.logger.debug("Remote command: %s" % command)
+        (_, stdout, stderr) = ssh.exec_command(command)
+
+        tout = Thread(target=self.stdout_handler, args=(stdout,),
+                      name="%s stdout" % self._remote_host)
+        tout.daemon = True
+        tout.start()
+
+        terr = Thread(target=self.stderr_handler, args=(stderr,),
+                      name="%s stderr" % self._remote_host)
+        terr.daemon = True
+        terr.start()
+
+        self.logger.info("Started fio on " + self.remote_host)
+        terr.join()
+        tout.join()
+        self.logger.info("Finished fio on " + self.remote_host)
 
     def terminate(self):
         self.logger.debug("Terminating fio on " + self.remote_host)
index 6168717..6d6026e 100644 (file)
@@ -3,7 +3,7 @@
     "disable_existing_loggers": false,
     "formatters": {
         "simple": {
-            "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
+            "format": "%(asctime)s - %(threadName)s - %(name)s - %(levelname)s - %(message)s"
         }
     },
 
     "storperf": {
         "level": "DEBUG",
         "handlers": ["console", "file_handler", "error_file_handler"]
+    },
+
+    "storperf.carbon.emitter": {
+        "level": "INFO",
+        "handlers": ["console", "file_handler", "error_file_handler"]
     }
+
 }
\ No newline at end of file
index 99df47f..35cba72 100644 (file)
@@ -11,16 +11,18 @@ from datetime import datetime
 import logging
 import os
 import socket
-import subprocess
 from threading import Thread
 from time import sleep
 
+import paramiko
+from scp import SCPClient
+
 import cinderclient.v2 as cinderclient
-from db.configuration_db import ConfigurationDB
-from db.job_db import JobDB
 import heatclient.client as heatclient
 import keystoneclient.v2_0 as ksclient
+from storperf.db.configuration_db import ConfigurationDB
 from storperf.db.graphite_db import GraphiteDB
+from storperf.db.job_db import JobDB
 from test_executor import TestExecutor
 
 
@@ -359,63 +361,26 @@ class StorPerfMaster(object):
                 alive = True
                 logger.debug("Slave " + slave + " is alive and ready")
 
-        args = ['scp', '-o', 'StrictHostKeyChecking=no',
-                '-o', 'UserKnownHostsFile=/dev/null',
-                '-o', 'LogLevel=error',
-                '-i', 'storperf/resources/ssh/storperf_rsa',
-                '/lib/x86_64-linux-gnu/libaio.so.1',
-                'storperf@' + slave + ":"]
-
-        logger.debug(args)
-        proc = subprocess.Popen(args,
-                                universal_newlines=True,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.PIPE)
-
-        (stdout, stderr) = proc.communicate()
-        if (len(stdout) > 0):
-            logger.debug(stdout.decode('utf-8').strip())
-        if (len(stderr) > 0):
-            logger.error(stderr.decode('utf-8').strip())
-
-        args = ['scp', '-o', 'StrictHostKeyChecking=no',
-                '-o', 'UserKnownHostsFile=/dev/null',
-                '-o', 'LogLevel=error',
-                '-i', 'storperf/resources/ssh/storperf_rsa',
-                '/usr/local/bin/fio',
-                'storperf@' + slave + ":"]
-
-        logger.debug(args)
-        proc = subprocess.Popen(args,
-                                universal_newlines=True,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.PIPE)
-
-        (stdout, stderr) = proc.communicate()
-        if (len(stdout) > 0):
-            logger.debug(stdout.decode('utf-8').strip())
-        if (len(stderr) > 0):
-            logger.error(stderr.decode('utf-8').strip())
-
-        args = ['ssh', '-o', 'StrictHostKeyChecking=no',
-                '-o', 'UserKnownHostsFile=/dev/null',
-                '-o', 'LogLevel=error',
-                '-i', 'storperf/resources/ssh/storperf_rsa',
-                'storperf@' + slave,
-                'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
-                ]
-
-        logger.debug(args)
-        proc = subprocess.Popen(args,
-                                universal_newlines=True,
-                                stdout=subprocess.PIPE,
-                                stderr=subprocess.PIPE)
-
-        (stdout, stderr) = proc.communicate()
-        if (len(stdout) > 0):
-            logger.debug(stdout.decode('utf-8').strip())
-        if (len(stderr) > 0):
-            logger.error(stderr.decode('utf-8').strip())
+        ssh = paramiko.SSHClient()
+        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+        ssh.connect(slave, username='storperf',
+                    key_filename='storperf/resources/ssh/storperf_rsa',
+                    timeout=2)
+
+        scp = SCPClient(ssh.get_transport())
+        logger.debug("Transferring libaio.so.1 to %s" % slave)
+        scp.put('/lib/x86_64-linux-gnu/libaio.so.1', '~/')
+        logger.debug("Transferring fio to %s" % slave)
+        scp.put('/usr/local/bin/fio', '~/')
+
+        cmd = 'sudo cp -v libaio.so.1 /lib/x86_64-linux-gnu/libaio.so.1'
+        logger.debug("Executing on %s: %s" % (slave, cmd))
+        (_, stdout, stderr) = ssh.exec_command(cmd)
+
+        for line in stdout.readlines():
+            logger.debug(line.decode('utf-8').strip())
+        for line in stderr.readlines():
+            logger.error(line.decode('utf-8').strip())
 
     def _make_parameters(self):
         heat_parameters = {}