1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
19 from vstf.common import unix, message, cliutil, excepts
20 from vstf.common.vstfcli import VstfParser
21 from vstf.common.log import setup_logging
22 from vstf.common import daemon
23 from vstf.controller.fabricant import Fabricant
24 from vstf.agent.env.basic.commandline import CommandLine
25 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
26 from vstf.controller.env_build.env_collect import EnvCollectApi
27 from vstf.controller.database.dbinterface import DbManage
28 import vstf.controller.sw_perf.performance as pf
29 from vstf.controller.settings.tester_settings import TesterSettings
30 from vstf.controller.settings.device_settings import DeviceSettings
31 from vstf.controller.settings.flows_settings import FlowsSettings
32 from vstf.controller.settings.mail_settings import MailSettings
33 from vstf.controller.settings.tool_settings import ToolSettings
34 from vstf.controller.settings.perf_settings import PerfSettings
35 from vstf.controller.sw_perf.perf_provider import PerfProvider
36 from vstf.controller.sw_perf.flow_producer import FlowsProducer
37 from vstf.controller.settings.forwarding_settings import ForwardingSettings
38 import vstf.controller.reporters.reporter as rp
39 import vstf.rpc_frame_work.rpc_producer as rpc
40 import vstf.common.constants as cst
41 import vstf.common.check as chk
43 LOG = logging.getLogger(__name__)
47 class OpsChains(object):
49 def __init__(self, monitor, port):
50 """The ops chains will setup the proxy to rabbitmq
51 and setup a thread to watch the queues of rabbitmq
54 LOG.info("VSTF Manager start to listen to %s", monitor)
55 if not os.path.exists(cst.VSTFCPATH):
56 os.mkdir(cst.VSTFCPATH)
58 self.chanl = rpc.Server(host=monitor, port=port)
59 self.dbconn = DbManage()
60 self.collection = EnvCollectApi(self.chanl)
62 def list_devs(self, **kwargs):
63 target = kwargs.get('host')
65 respond = "the target is empty, not support now."
67 respond = self.chanl.call(
68 self.chanl.make_msg("list_nic_devices"), target)
71 def src_install(self, host, config_file):
72 if not os.path.exists(config_file):
73 raise Exception("Can not found the config file.")
74 cfg = json.load(open(config_file))
75 msg = self.chanl.make_msg("src_install", cfg=cfg)
76 return self.chanl.call(msg, host, timeout=1000)
78 def create_images(self, host, config_file):
79 if not os.path.exists(config_file):
80 raise Exception("Can not found the config file.")
81 cfg = json.load(open(config_file))
82 msg = self.chanl.make_msg("create_images", cfg=cfg)
83 return self.chanl.call(msg, host, timeout=1000)
85 def clean_images(self, host, config_file):
86 if not os.path.exists(config_file):
87 raise Exception("Can not found the config file.")
88 cfg = json.load(open(config_file))
89 msg = self.chanl.make_msg("clean_images", cfg=cfg)
90 return self.chanl.call(msg, host, timeout=1000)
92 def apply_model(self, host, model=None, config_file=None):
93 if config_file is None:
94 config_file = "/etc/vstf/env/%s.json" % model
95 if not os.path.exists(config_file):
96 raise Exception("Can not found the config file.")
97 env = Builder(self.chanl, config_file)
101 def disapply_model(self, host, model=None, config_file=None):
102 if config_file is None:
103 config_file = "/etc/vstf/env/%s.json" % model
104 if not os.path.exists(config_file):
105 raise Exception("Can not found the config file.")
106 env = Builder(self.chanl, config_file)
110 def list_tasks(self):
111 ret = self.dbconn.query_tasks()
112 head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
117 def affctl_list(self, host):
119 return "Need input the host"
120 return Fabricant(host, self.chanl).affctl_list()
122 def _create_task(self, scenario):
123 taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(
124 cst.TIME_FORMAT), desc=scenario + "Test")
125 LOG.info("new Task id:%s" % taskid)
127 raise Exception("DB create task failed.")
129 device = DeviceSettings().settings
130 hosts = [device["host"], device["tester"]]
134 devs = host["devs"][0]
135 keys = ["bdf", "iface", "mac"]
140 raise Exception("error devs :%s", devs)
142 query = Fabricant(host["agent"], self.chanl)
143 nic_info = query.get_device_detail(identity=name)
147 os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host[
154 self.dbconn.add_host_2task(taskid,
156 json.dumps(hw_info[cst.HW_INFO]),
157 json.dumps(cpu_info[cst.CPU_INFO]),
158 json.dumps(mem_info[cst.MEMORY_INFO]),
160 json.dumps(os_info[cst.OS_INFO]))
162 self.dbconn.add_extent_2task(taskid, "ixgbe", "driver", "")
163 self.dbconn.add_extent_2task(taskid, "OVS", "switch", "")
166 def settings(self, head, tail):
168 forward_settings = ForwardingSettings()
171 "namespace": forward_settings.settings["head"]["namespace"]
175 "namespace": forward_settings.settings["tail"]["namespace"]
179 forward_settings.set_head(head_d)
180 forward_settings.set_tail(tail_d)
182 def report(self, rpath='./', mail_off=False, taskid=-1):
183 report = rp.Report(self.dbconn, rpath)
185 taskid = self.dbconn.get_last_taskid()
186 report.report(taskid, mail_off)
187 info_str = "do report over"
201 "build_on:%s report_on:%s mail_on:%s" %
202 (build_on, report_on, mail_on))
203 casetag = case['case']
205 protocol = case['protocol']
209 sizes = case['sizes']
211 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
215 scenario = self.dbconn.query_scenario(casetag)
218 LOG.warn("not support the case:%s", casetag)
221 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
223 LOG.info(config_file)
224 env = Builder(self.chanl, config_file)
227 flows_settings = FlowsSettings()
228 tool_settings = ToolSettings()
229 tester_settings = TesterSettings()
230 flow_producer = FlowsProducer(self.chanl, flows_settings)
231 provider = PerfProvider(
232 flows_settings.settings,
233 tool_settings.settings,
234 tester_settings.settings)
236 perf = pf.Performance(self.chanl, provider)
237 flow_producer.create(scenario, casetag)
238 result = perf.run(tool, protocol, ttype, sizes, affctl)
239 LOG.info(flows_settings.settings)
242 taskid = self._create_task(scenario)
243 testid = self.dbconn.add_test_2task(
244 taskid, casetag, protocol, ttype, switch, provider, tool)
246 self.dbconn.add_data_2test(testid, result)
248 self.report(rpath, not mail_on, taskid)
257 perf_settings = PerfSettings()
258 flows_settings = FlowsSettings()
259 tool_settings = ToolSettings()
260 tester_settings = TesterSettings()
261 flow_producer = FlowsProducer(self.chanl, flows_settings)
262 provider = PerfProvider(
263 flows_settings.settings,
264 tool_settings.settings,
265 tester_settings.settings)
266 perf = pf.Performance(self.chanl, provider)
267 tests = perf_settings.settings
269 for scenario, cases in tests.items():
274 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
276 LOG.info(config_file)
277 env = Builder(self.chanl, config_file)
280 taskid = self._create_task(scenario)
284 casetag = case['case']
286 protocol = case['protocol']
290 sizes = case['sizes']
292 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
297 flow_producer.create(scenario, casetag)
298 result = perf.run(tool, protocol, ttype, sizes, affctl)
301 testid = self.dbconn.add_test_2task(
302 taskid, casetag, protocol, ttype, switch, provider, tool)
305 self.dbconn.add_data_2test(testid, result)
308 self.report(rpath, not mail_on, taskid)
310 info_str = "do batch perf test successfully"
313 def collect_host_info(self, target):
314 if self.collection is not None:
315 return self.collection.collect_host_info(target)
317 return "collection is None"
320 class Manager(daemon.Daemon):
324 The manager will create a socket for vstfadm.
325 also the manager own a ops chains
327 super(Manager, self).__init__(cst.vstf_pid)
328 # the connection of socket
330 # the operations of manager
332 # record the daemon run flag
335 def deal_unknown_obj(self, obj):
336 return "unknown response %s:%s" % (self, obj)
339 signal.signal(signal.SIGTERM, self.daemon_die)
340 # setup the socket server for communicating with vstfadm
342 self.conn = unix.UdpServer()
345 except Exception as e:
348 # accept the connection of vstfadm and recv the command
349 # run the command from vstfadm and return the response
351 conn, addr = self.conn.accept()
352 LOG.debug("accept the conn: %(conn)s", {'conn': conn})
354 # recv the msg until the conn break.
358 data = message.recv(conn.recv)
359 LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
360 msg = message.decode(data)
361 body = message.get_body(msg)
362 context = message.get_context(msg)
364 LOG.debug("manage catch the connection close!")
366 except Exception as e:
367 LOG.error("Manager recv message from socket failed.")
372 func = getattr(self.ops, body.get('method'))
373 LOG.info("Call function:%s, args:%s",
374 func.__name__, body.get('args'))
375 response = func(**body.get('args'))
376 LOG.info("response: %s", response)
377 except excepts.UnsolvableExit as e:
378 msg = "The manager opps, exit now"
380 # the manager has no need to be continue, just return
381 # this msg and exit normal
384 except Exception as e:
385 # here just the function failed no need exit, just return
387 msg = "Run function failed. [ %s ]" % (e)
391 response = message.add_context(response, **context)
393 "Manager send the response: <%(r)s", {
395 message.send(conn.send, message.encode(response))
396 except Exception as e:
399 # close the connection when conn down
402 def daemon_die(self, signum, frame):
403 """overwrite daemon.Daemon.daemon_die(self)"""
404 LOG.info("manage catch the signal %s to exit." % signum)
406 # we can not close the conn direct, just tell manager to stop
408 self.run_flag = False
411 # stop the ops's proxy
412 # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
413 # this a know bug in pika. fix in 0.9.14 release
415 self.ops.chanl.close()
416 except AttributeError:
417 LOG.warning("The connection close happens attribute error")
419 def start_manage(self, monitor="localhost", port=5672):
421 # create manager's ops chains here will create a proxy to rabbitmq
422 self.ops = OpsChains(monitor, port)
423 except Exception as e:
427 def stop_manage(self):
431 @cliutil.arg("--monitor",
435 help="which ip to be monitored")
436 @cliutil.arg("--port",
440 help="rabbitmq conn server")
442 Manager().start_manage(args.monitor, args.port)
446 Manager().stop_manage()
450 """this is for vstfctl"""
453 log_file="/var/log/vstf/vstf-manager.log",
457 description="vstf manager command line")
458 parser.set_subcommand_parser(target=sys.modules[__name__])
459 args = parser.parse_args()