Upload the contribution of vstf as bottleneck network framework.
[bottlenecks.git] / vstf / vstf / controller / api_server.py
diff --git a/vstf/vstf/controller/api_server.py b/vstf/vstf/controller/api_server.py
new file mode 100755 (executable)
index 0000000..d354701
--- /dev/null
@@ -0,0 +1,403 @@
+import uuid
+import time
+import os
+import sys
+import logging
+import signal
+import json
+
+from vstf.common import unix, message, cliutil, excepts
+from vstf.common.vstfcli import VstfParser
+from vstf.common.log import setup_logging
+from vstf.common import daemon
+from vstf.rpc_frame_work import rpc_producer
+from vstf.controller.fabricant import Fabricant
+from vstf.agent.env.basic.commandline import CommandLine
+from vstf.controller.env_build.env_build import EnvBuildApi as Builder
+from vstf.controller.env_build.env_collect import EnvCollectApi
+from vstf.controller.database.dbinterface import DbManage
+import vstf.controller.sw_perf.performance as pf
+from vstf.controller.settings.tester_settings import TesterSettings
+from vstf.controller.settings.device_settings import DeviceSettings
+from vstf.controller.settings.flows_settings import FlowsSettings
+from vstf.controller.settings.mail_settings import MailSettings
+from vstf.controller.settings.tool_settings import ToolSettings
+from vstf.controller.settings.perf_settings import PerfSettings
+from vstf.controller.sw_perf.perf_provider import PerfProvider
+from vstf.controller.sw_perf.flow_producer import FlowsProducer
+import vstf.controller.reporters.reporter as rp
+import vstf.common.constants as cst
+import vstf.common.check as chk
+
+LOG = logging.getLogger(__name__)
+cmd = CommandLine()
+
+
+class OpsChains(object):
+    def __init__(self, monitor, port):
+        """The ops chains will setup the proxy to rabbitmq
+        and setup a thread to watch the queues of rabbitmq
+        
+        """
+        super(OpsChains, self).__init__()
+        if not os.path.exists(cst.VSTFCPATH):
+            os.mkdir(cst.VSTFCPATH)
+
+        LOG.info("VSTF Manager start to listen to %s", monitor)
+        self.chanl = rpc_producer.Server(host=monitor, port=port)
+        self.dbconn = DbManage()
+        self.collection = EnvCollectApi(self.chanl)
+
+    def list_devs(self, **kwargs):
+        target = kwargs.get('host')
+        if not target:
+            respond = "the target is empty, not support now."
+        else:
+            respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
+        return respond
+
+    def src_install(self, host, config_file):
+        if not os.path.exists(config_file):
+            raise Exception("Can not found the config file.")
+        cfg = json.load(open(config_file))
+        msg = self.chanl.make_msg("src_install", cfg=cfg)
+        return self.chanl.call(msg, host, timeout=1000)
+
+    def create_images(self, host, config_file):
+        if not os.path.exists(config_file):
+            raise Exception("Can not found the config file.")
+        cfg = json.load(open(config_file))
+        msg = self.chanl.make_msg("create_images", cfg=cfg)
+        return self.chanl.call(msg, host, timeout=1000)
+
+    def clean_images(self, host, config_file):
+        if not os.path.exists(config_file):
+            raise Exception("Can not found the config file.")
+        cfg = json.load(open(config_file))
+        msg = self.chanl.make_msg("clean_images", cfg=cfg)
+        return self.chanl.call(msg, host, timeout=1000)
+
+    def apply_model(self, host, model=None, config_file=None):
+        if config_file is None:
+            config_file = "/etc/vstf/env/%s.json" % model
+        if not os.path.exists(config_file):
+            raise Exception("Can not found the config file.")
+        env = Builder(self.chanl, config_file)
+        ret = env.build()
+        return ret
+
+    def disapply_model(self, host, model=None, config_file=None):
+        if config_file is None:
+            config_file = "/etc/vstf/env/%s.json" % model
+        if not os.path.exists(config_file):
+            raise Exception("Can not found the config file.")
+        env = Builder(self.chanl, config_file)
+        ret = env.clean()
+        return ret
+
+    def list_tasks(self):
+        ret = self.dbconn.query_tasks()
+        head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
+        if ret:
+            ret = head + ret
+        return ret
+
+    def affctl_list(self, host):
+        if not host:
+            return "Need input the host"
+        return Fabricant(host, self.chanl).affctl_list()
+
+    def _create_task(self, scenario):
+        taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
+                                         desc=scenario + "Test")
+        LOG.info("new Task id:%s" % taskid)
+        if -1 == taskid:
+            raise Exception("DB create task failed.")
+
+        device = DeviceSettings().settings
+        hosts = [device["host"], device["tester"]]
+        for host in hosts:
+            LOG.info(host)
+
+            devs = host["devs"][0]
+            keys = ["bdf", "iface", "mac"]
+            key = devs.keys()[0]
+            if key in keys:
+                name = devs[key]
+            else:
+                raise Exception("error devs :%s", devs)
+
+            query = Fabricant(host["agent"], self.chanl)
+            nic_info = query.get_device_detail(identity=name)
+
+            LOG.info(nic_info)
+
+            os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
+            LOG.info(os_info)
+            LOG.info(cpu_info)
+            LOG.info(mem_info)
+            LOG.info(hw_info)
+
+            self.dbconn.add_host_2task(taskid,
+                                       host["agent"],
+                                       json.dumps(hw_info[cst.HW_INFO]),
+                                       json.dumps(cpu_info[cst.CPU_INFO]),
+                                       json.dumps(mem_info[cst.MEMORY_INFO]),
+                                       nic_info["desc"],
+                                       json.dumps(os_info[cst.OS_INFO]))
+
+        self.dbconn.add_extent_2task(taskid, "CETH", "driver", "version 2.0")
+        self.dbconn.add_extent_2task(taskid, "EVS", "switch", "version 3.0")
+        return taskid
+
+    def settings(self, mail=False, perf=False):
+        LOG.info("mail:%s, perf:%s" % (mail, perf))
+        if mail:
+            MailSettings().input()
+        if perf:
+            PerfSettings().input()
+
+    def report(self, rpath='./', mail_off=False, taskid=-1):
+        report = rp.Report(self.dbconn, rpath)
+        if taskid == -1:
+            taskid = self.dbconn.get_last_taskid()
+        report.report(taskid, mail_off)
+        info_str = "do report over"
+        return info_str
+
+    def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False, mail_on=False):
+        LOG.info(case)
+        LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
+        casetag = case['case']
+        tool = case['tool']
+        protocol = case['protocol']
+        profile = case['profile']
+        ttype = case['type']
+        sizes = case['sizes']
+
+        ret, ret_str = chk.check_case_params(protocol, ttype, tool)
+        if not ret:
+            return ret_str
+
+        scenario = self.dbconn.query_scenario(casetag)
+        LOG.info(scenario)
+        if not scenario:
+            LOG.warn("not support the case:%s", casetag)
+            return
+
+        config_file = os.path.join("/etc/vstf/env", scenario + ".json")
+
+        LOG.info(config_file)
+        env = Builder(self.chanl, config_file)
+        if build_on:
+            env.build()
+        flows_settings = FlowsSettings()
+        tool_settings = ToolSettings()
+        tester_settings = TesterSettings()
+        flow_producer = FlowsProducer(self.chanl, flows_settings)
+        provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
+
+        perf = pf.Performance(self.chanl, provider)
+        flow_producer.create(scenario, casetag)
+        result = perf.run(tool, protocol, ttype, sizes, affctl)
+        LOG.info(flows_settings.settings)
+        LOG.info(result)
+        if save_on:
+            taskid = self._create_task(scenario)
+            testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
+            LOG.info(testid)
+            self.dbconn.add_data_2test(testid, result)
+            if report_on:
+                self.report(rpath, not mail_on, taskid)
+        return result
+
+    def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
+        perf_settings = PerfSettings()
+        flows_settings = FlowsSettings()
+        tool_settings = ToolSettings()
+        tester_settings = TesterSettings()
+        flow_producer = FlowsProducer(self.chanl, flows_settings)
+        provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
+        perf = pf.Performance(self.chanl, provider)
+        tests = perf_settings.settings
+
+        for scenario, cases in tests.items():
+            LOG.info(scenario)
+            if not cases:
+                continue
+
+            config_file = os.path.join("/etc/vstf/env", scenario + ".json")
+
+            LOG.info(config_file)
+            env = Builder(self.chanl, config_file)
+            env.build()
+
+            taskid = self._create_task(scenario)
+
+            for case in cases:
+                LOG.info(case)
+                casetag = case['case']
+                tool = case['tool']
+                protocol = case['protocol']
+                profile = case['profile']
+                ttype = case['type']
+                sizes = case['sizes']
+
+                ret, ret_str = chk.check_case_params(protocol, ttype, tool)
+                if not ret:
+                    LOG.warn(ret_str)
+                    continue
+
+                flow_producer.create(scenario, casetag)
+                result = perf.run(tool, protocol, ttype, sizes, affctl)
+                LOG.info(result)
+
+                testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
+                LOG.info(testid)
+
+                self.dbconn.add_data_2test(testid, result)
+
+            if report_on:
+                self.report(rpath, not mail_on, taskid)
+
+        info_str = "do batch perf test successfully"
+        return info_str
+
+    def collect_host_info(self, target):
+        if self.collection is not None:
+            return self.collection.collect_host_info(target)
+        else:
+            return "collection is None"
+
+
+class Manager(daemon.Daemon):
+    def __init__(self):
+        """
+        The manager will create a socket for vstfadm.
+        also the manager own a ops chains
+        """
+        super(Manager, self).__init__(cst.vstf_pid)
+        # the connection of socket
+        self.conn = None
+        # the operations of manager
+        self.ops = None
+        # record the daemon run flag
+        self.run_flag = True
+
+    def deal_unknown_obj(self, obj):
+        return "unknown response %s" % obj
+
+    def run(self):
+        signal.signal(signal.SIGTERM, self.daemon_die)
+        # setup the socket server for communicating with vstfadm
+        try:
+            self.conn = unix.UdpServer()
+            self.conn.bind()
+            self.conn.listen()
+        except Exception as e:
+            raise e
+
+        # accept the connection of vstfadm and recv the command
+        # run the command from vstfadm and return the response
+        while self.run_flag:
+            conn, addr = self.conn.accept()
+            LOG.debug("accept the conn: %(conn)s", {'conn': conn})
+
+            # recv the msg until the conn break.
+
+            while True:
+                try:
+                    data = message.recv(conn.recv)
+                    LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
+                    msg = message.decode(data)
+                    body = message.get_body(msg)
+                    context = message.get_context(msg)
+                except RuntimeError:
+                    LOG.debug("manage catch the connection close!")
+                    break
+                except Exception as e:
+                    LOG.error("Manager recv message from socket failed.")
+                    self.daemon_die()
+                    raise e
+
+                try:
+                    func = getattr(self.ops, body.get('method'))
+                    LOG.info("Call function:%s, args:%s",
+                             func.__name__, body.get('args'))
+                    response = func(**body.get('args'))
+                    LOG.info("response: %s", response)
+                except excepts.UnsolvableExit as e:
+                    msg = "The manager opps, exit now"
+                    LOG.error(msg)
+                    # the manager has no need to be continue, just return
+                    # this msg and exit normal
+                    self.daemon_die()
+                    raise e
+                except Exception as e:
+                    # here just the function failed no need exit, just return the msg
+                    msg = "Run function failed. [ %s ]" % (e)
+                    response = msg
+                    LOG.error(msg)
+                try:
+                    response = message.add_context(response, **context)
+                    LOG.debug("Manager send the response: <%(r)s", {'r': response})
+                    message.send(conn.send, message.encode(response))
+                except Exception as e:
+                    self.daemon_die()
+                    raise e
+            # close the connection when conn down
+            conn.close()
+
+    def daemon_die(self, signum, frame):
+        """overwrite daemon.Daemon.daemon_die(self)"""
+        LOG.info("manage catch the signal %s to exit." % signum)
+        if self.conn:
+            # we can not close the conn direct, just tell manager to stop accept
+            self.run_flag = False
+
+        if self.ops:
+            # stop the ops's proxy
+            # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
+            # this a know bug in pika. fix in 0.9.14 release
+            try:
+                self.ops.chanl.close()
+            except AttributeError:
+                LOG.warning("The connection close happens attribute error")
+
+    def start_manage(self, monitor="localhost", port=5672):
+        try:
+            # create manager's ops chains here will create a proxy to rabbitmq
+            self.ops = OpsChains(monitor, port)
+        except Exception as e:
+            raise e
+        self.start()
+
+    def stop_manage(self):
+        self.stop()
+
+
+@cliutil.arg("--monitor",
+             dest="monitor",
+             default="localhost",
+             action="store",
+             help="which ip to be monitored")
+@cliutil.arg("--port",
+             dest="port",
+             default="5672",
+             action="store",
+             help="rabbitmq conn server")
+def do_start(args):
+    Manager().start_manage(args.monitor, args.port)
+
+
+def do_stop(args):
+    Manager().stop_manage()
+
+
+def main():
+    """this is for vstfctl"""
+    setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
+    parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
+    parser.set_subcommand_parser(target=sys.modules[__name__])
+    args = parser.parse_args()
+    args.func(args)