1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
19 from vstf.common import unix, message, cliutil, excepts
20 from vstf.common.vstfcli import VstfParser
21 from vstf.common.log import setup_logging
22 from vstf.common import daemon
23 from vstf.controller.fabricant import Fabricant
24 from vstf.agent.env.basic.commandline import CommandLine
25 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
26 from vstf.controller.env_build.env_collect import EnvCollectApi
27 from vstf.controller.database.dbinterface import DbManage
28 import vstf.controller.sw_perf.performance as pf
29 from vstf.controller.settings.tester_settings import TesterSettings
30 from vstf.controller.settings.device_settings import DeviceSettings
31 from vstf.controller.settings.flows_settings import FlowsSettings
32 from vstf.controller.settings.mail_settings import MailSettings
33 from vstf.controller.settings.tool_settings import ToolSettings
34 from vstf.controller.settings.perf_settings import PerfSettings
35 from vstf.controller.sw_perf.perf_provider import PerfProvider
36 from vstf.controller.sw_perf.flow_producer import FlowsProducer
37 from vstf.controller.settings.forwarding_settings import ForwardingSettings
38 import vstf.controller.reporters.reporter as rp
39 import vstf.rpc_frame_work.rpc_producer as rpc
40 import vstf.common.constants as cst
41 import vstf.common.check as chk
43 LOG = logging.getLogger(__name__)
47 class OpsChains(object):
48 def __init__(self, monitor, port):
49 """The ops chains will setup the proxy to rabbitmq
50 and setup a thread to watch the queues of rabbitmq
53 LOG.info("VSTF Manager start to listen to %s", monitor)
54 if not os.path.exists(cst.VSTFCPATH):
55 os.mkdir(cst.VSTFCPATH)
57 self.chanl = rpc.Server(host=monitor, port=port)
58 self.dbconn = DbManage()
59 self.collection = EnvCollectApi(self.chanl)
61 def list_devs(self, **kwargs):
62 target = kwargs.get('host')
64 respond = "the target is empty, not support now."
66 respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
69 def src_install(self, host, config_file):
70 if not os.path.exists(config_file):
71 raise Exception("Can not found the config file.")
72 cfg = json.load(open(config_file))
73 msg = self.chanl.make_msg("src_install", cfg=cfg)
74 return self.chanl.call(msg, host, timeout=1000)
76 def create_images(self, host, config_file):
77 if not os.path.exists(config_file):
78 raise Exception("Can not found the config file.")
79 cfg = json.load(open(config_file))
80 msg = self.chanl.make_msg("create_images", cfg=cfg)
81 return self.chanl.call(msg, host, timeout=1000)
83 def clean_images(self, host, config_file):
84 if not os.path.exists(config_file):
85 raise Exception("Can not found the config file.")
86 cfg = json.load(open(config_file))
87 msg = self.chanl.make_msg("clean_images", cfg=cfg)
88 return self.chanl.call(msg, host, timeout=1000)
90 def apply_model(self, host, model=None, config_file=None):
91 if config_file is None:
92 config_file = "/etc/vstf/env/%s.json" % model
93 if not os.path.exists(config_file):
94 raise Exception("Can not found the config file.")
95 env = Builder(self.chanl, config_file)
99 def disapply_model(self, host, model=None, config_file=None):
100 if config_file is None:
101 config_file = "/etc/vstf/env/%s.json" % model
102 if not os.path.exists(config_file):
103 raise Exception("Can not found the config file.")
104 env = Builder(self.chanl, config_file)
108 def list_tasks(self):
109 ret = self.dbconn.query_tasks()
110 head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
115 def affctl_list(self, host):
117 return "Need input the host"
118 return Fabricant(host, self.chanl).affctl_list()
120 def _create_task(self, scenario):
121 taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
122 desc=scenario + "Test")
123 LOG.info("new Task id:%s" % taskid)
125 raise Exception("DB create task failed.")
127 device = DeviceSettings().settings
128 hosts = [device["host"], device["tester"]]
132 devs = host["devs"][0]
133 keys = ["bdf", "iface", "mac"]
138 raise Exception("error devs :%s", devs)
140 query = Fabricant(host["agent"], self.chanl)
141 nic_info = query.get_device_detail(identity=name)
145 os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
151 self.dbconn.add_host_2task(taskid,
153 json.dumps(hw_info[cst.HW_INFO]),
154 json.dumps(cpu_info[cst.CPU_INFO]),
155 json.dumps(mem_info[cst.MEMORY_INFO]),
157 json.dumps(os_info[cst.OS_INFO]))
159 self.dbconn.add_extent_2task(taskid, "ixgbe", "driver", "")
160 self.dbconn.add_extent_2task(taskid, "OVS", "switch", "")
163 def settings(self, head, tail):
165 forward_settings = ForwardingSettings()
168 "namespace":forward_settings.settings["head"]["namespace"]
172 "namespace":forward_settings.settings["tail"]["namespace"]
176 forward_settings.set_head(head_d)
177 forward_settings.set_tail(tail_d)
179 def report(self, rpath='./', mail_off=False, taskid=-1):
180 report = rp.Report(self.dbconn, rpath)
182 taskid = self.dbconn.get_last_taskid()
183 report.report(taskid, mail_off)
184 info_str = "do report over"
187 def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False,
190 LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
191 casetag = case['case']
193 protocol = case['protocol']
197 sizes = case['sizes']
199 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
203 scenario = self.dbconn.query_scenario(casetag)
206 LOG.warn("not support the case:%s", casetag)
209 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
211 LOG.info(config_file)
212 env = Builder(self.chanl, config_file)
215 flows_settings = FlowsSettings()
216 tool_settings = ToolSettings()
217 tester_settings = TesterSettings()
218 flow_producer = FlowsProducer(self.chanl, flows_settings)
219 provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
221 perf = pf.Performance(self.chanl, provider)
222 flow_producer.create(scenario, casetag)
223 result = perf.run(tool, protocol, ttype, sizes, affctl)
224 LOG.info(flows_settings.settings)
227 taskid = self._create_task(scenario)
228 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, ttype, switch, provider, tool)
230 self.dbconn.add_data_2test(testid, result)
232 self.report(rpath, not mail_on, taskid)
235 def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
236 perf_settings = PerfSettings()
237 flows_settings = FlowsSettings()
238 tool_settings = ToolSettings()
239 tester_settings = TesterSettings()
240 flow_producer = FlowsProducer(self.chanl, flows_settings)
241 provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
242 perf = pf.Performance(self.chanl, provider)
243 tests = perf_settings.settings
245 for scenario, cases in tests.items():
250 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
252 LOG.info(config_file)
253 env = Builder(self.chanl, config_file)
256 taskid = self._create_task(scenario)
260 casetag = case['case']
262 protocol = case['protocol']
266 sizes = case['sizes']
268 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
273 flow_producer.create(scenario, casetag)
274 result = perf.run(tool, protocol, ttype, sizes, affctl)
277 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, ttype, switch, provider, tool)
280 self.dbconn.add_data_2test(testid, result)
283 self.report(rpath, not mail_on, taskid)
285 info_str = "do batch perf test successfully"
288 def collect_host_info(self, target):
289 if self.collection is not None:
290 return self.collection.collect_host_info(target)
292 return "collection is None"
295 class Manager(daemon.Daemon):
298 The manager will create a socket for vstfadm.
299 also the manager own a ops chains
301 super(Manager, self).__init__(cst.vstf_pid)
302 # the connection of socket
304 # the operations of manager
306 # record the daemon run flag
309 def deal_unknown_obj(self, obj):
310 return "unknown response %s:%s" % (self, obj)
313 signal.signal(signal.SIGTERM, self.daemon_die)
314 # setup the socket server for communicating with vstfadm
316 self.conn = unix.UdpServer()
319 except Exception as e:
322 # accept the connection of vstfadm and recv the command
323 # run the command from vstfadm and return the response
325 conn, addr = self.conn.accept()
326 LOG.debug("accept the conn: %(conn)s", {'conn': conn})
328 # recv the msg until the conn break.
332 data = message.recv(conn.recv)
333 LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
334 msg = message.decode(data)
335 body = message.get_body(msg)
336 context = message.get_context(msg)
338 LOG.debug("manage catch the connection close!")
340 except Exception as e:
341 LOG.error("Manager recv message from socket failed.")
346 func = getattr(self.ops, body.get('method'))
347 LOG.info("Call function:%s, args:%s",
348 func.__name__, body.get('args'))
349 response = func(**body.get('args'))
350 LOG.info("response: %s", response)
351 except excepts.UnsolvableExit as e:
352 msg = "The manager opps, exit now"
354 # the manager has no need to be continue, just return
355 # this msg and exit normal
358 except Exception as e:
359 # here just the function failed no need exit, just return the msg
360 msg = "Run function failed. [ %s ]" % (e)
364 response = message.add_context(response, **context)
365 LOG.debug("Manager send the response: <%(r)s", {'r': response})
366 message.send(conn.send, message.encode(response))
367 except Exception as e:
370 # close the connection when conn down
373 def daemon_die(self, signum, frame):
374 """overwrite daemon.Daemon.daemon_die(self)"""
375 LOG.info("manage catch the signal %s to exit." % signum)
377 # we can not close the conn direct, just tell manager to stop accept
378 self.run_flag = False
381 # stop the ops's proxy
382 # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
383 # this a know bug in pika. fix in 0.9.14 release
385 self.ops.chanl.close()
386 except AttributeError:
387 LOG.warning("The connection close happens attribute error")
389 def start_manage(self, monitor="localhost", port=5672):
391 # create manager's ops chains here will create a proxy to rabbitmq
392 self.ops = OpsChains(monitor, port)
393 except Exception as e:
397 def stop_manage(self):
401 @cliutil.arg("--monitor",
405 help="which ip to be monitored")
406 @cliutil.arg("--port",
410 help="rabbitmq conn server")
412 Manager().start_manage(args.monitor, args.port)
416 Manager().stop_manage()
420 """this is for vstfctl"""
421 setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
422 parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
423 parser.set_subcommand_parser(target=sys.modules[__name__])
424 args = parser.parse_args()