9 from vstf.common import unix, message, cliutil, excepts
10 from vstf.common.vstfcli import VstfParser
11 from vstf.common.log import setup_logging
12 from vstf.common import daemon
13 from vstf.rpc_frame_work import rpc_producer
14 from vstf.controller.fabricant import Fabricant
15 from vstf.agent.env.basic.commandline import CommandLine
16 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
17 from vstf.controller.env_build.env_collect import EnvCollectApi
18 from vstf.controller.database.dbinterface import DbManage
19 import vstf.controller.sw_perf.performance as pf
20 from vstf.controller.settings.tester_settings import TesterSettings
21 from vstf.controller.settings.device_settings import DeviceSettings
22 from vstf.controller.settings.flows_settings import FlowsSettings
23 from vstf.controller.settings.mail_settings import MailSettings
24 from vstf.controller.settings.tool_settings import ToolSettings
25 from vstf.controller.settings.perf_settings import PerfSettings
26 from vstf.controller.sw_perf.perf_provider import PerfProvider
27 from vstf.controller.sw_perf.flow_producer import FlowsProducer
28 import vstf.controller.reporters.reporter as rp
29 import vstf.common.constants as cst
30 import vstf.common.check as chk
32 LOG = logging.getLogger(__name__)
36 class OpsChains(object):
37 def __init__(self, monitor, port):
38 """The ops chains will setup the proxy to rabbitmq
39 and setup a thread to watch the queues of rabbitmq
42 super(OpsChains, self).__init__()
43 if not os.path.exists(cst.VSTFCPATH):
44 os.mkdir(cst.VSTFCPATH)
46 LOG.info("VSTF Manager start to listen to %s", monitor)
47 self.chanl = rpc_producer.Server(host=monitor, port=port)
48 self.dbconn = DbManage()
49 self.collection = EnvCollectApi(self.chanl)
51 def list_devs(self, **kwargs):
52 target = kwargs.get('host')
54 respond = "the target is empty, not support now."
56 respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
59 def src_install(self, host, config_file):
60 if not os.path.exists(config_file):
61 raise Exception("Can not found the config file.")
62 cfg = json.load(open(config_file))
63 msg = self.chanl.make_msg("src_install", cfg=cfg)
64 return self.chanl.call(msg, host, timeout=1000)
66 def create_images(self, host, config_file):
67 if not os.path.exists(config_file):
68 raise Exception("Can not found the config file.")
69 cfg = json.load(open(config_file))
70 msg = self.chanl.make_msg("create_images", cfg=cfg)
71 return self.chanl.call(msg, host, timeout=1000)
73 def clean_images(self, host, config_file):
74 if not os.path.exists(config_file):
75 raise Exception("Can not found the config file.")
76 cfg = json.load(open(config_file))
77 msg = self.chanl.make_msg("clean_images", cfg=cfg)
78 return self.chanl.call(msg, host, timeout=1000)
80 def apply_model(self, host, model=None, config_file=None):
81 if config_file is None:
82 config_file = "/etc/vstf/env/%s.json" % model
83 if not os.path.exists(config_file):
84 raise Exception("Can not found the config file.")
85 env = Builder(self.chanl, config_file)
89 def disapply_model(self, host, model=None, config_file=None):
90 if config_file is None:
91 config_file = "/etc/vstf/env/%s.json" % model
92 if not os.path.exists(config_file):
93 raise Exception("Can not found the config file.")
94 env = Builder(self.chanl, config_file)
99 ret = self.dbconn.query_tasks()
100 head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
105 def affctl_list(self, host):
107 return "Need input the host"
108 return Fabricant(host, self.chanl).affctl_list()
110 def _create_task(self, scenario):
111 taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
112 desc=scenario + "Test")
113 LOG.info("new Task id:%s" % taskid)
115 raise Exception("DB create task failed.")
117 device = DeviceSettings().settings
118 hosts = [device["host"], device["tester"]]
122 devs = host["devs"][0]
123 keys = ["bdf", "iface", "mac"]
128 raise Exception("error devs :%s", devs)
130 query = Fabricant(host["agent"], self.chanl)
131 nic_info = query.get_device_detail(identity=name)
135 os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
141 self.dbconn.add_host_2task(taskid,
143 json.dumps(hw_info[cst.HW_INFO]),
144 json.dumps(cpu_info[cst.CPU_INFO]),
145 json.dumps(mem_info[cst.MEMORY_INFO]),
147 json.dumps(os_info[cst.OS_INFO]))
149 self.dbconn.add_extent_2task(taskid, "CETH", "driver", "version 2.0")
150 self.dbconn.add_extent_2task(taskid, "EVS", "switch", "version 3.0")
153 def settings(self, mail=False, perf=False):
154 LOG.info("mail:%s, perf:%s" % (mail, perf))
156 MailSettings().input()
158 PerfSettings().input()
160 def report(self, rpath='./', mail_off=False, taskid=-1):
161 report = rp.Report(self.dbconn, rpath)
163 taskid = self.dbconn.get_last_taskid()
164 report.report(taskid, mail_off)
165 info_str = "do report over"
168 def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False, mail_on=False):
170 LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
171 casetag = case['case']
173 protocol = case['protocol']
174 profile = case['profile']
176 sizes = case['sizes']
178 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
182 scenario = self.dbconn.query_scenario(casetag)
185 LOG.warn("not support the case:%s", casetag)
188 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
190 LOG.info(config_file)
191 env = Builder(self.chanl, config_file)
194 flows_settings = FlowsSettings()
195 tool_settings = ToolSettings()
196 tester_settings = TesterSettings()
197 flow_producer = FlowsProducer(self.chanl, flows_settings)
198 provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
200 perf = pf.Performance(self.chanl, provider)
201 flow_producer.create(scenario, casetag)
202 result = perf.run(tool, protocol, ttype, sizes, affctl)
203 LOG.info(flows_settings.settings)
206 taskid = self._create_task(scenario)
207 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
209 self.dbconn.add_data_2test(testid, result)
211 self.report(rpath, not mail_on, taskid)
214 def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
215 perf_settings = PerfSettings()
216 flows_settings = FlowsSettings()
217 tool_settings = ToolSettings()
218 tester_settings = TesterSettings()
219 flow_producer = FlowsProducer(self.chanl, flows_settings)
220 provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
221 perf = pf.Performance(self.chanl, provider)
222 tests = perf_settings.settings
224 for scenario, cases in tests.items():
229 config_file = os.path.join("/etc/vstf/env", scenario + ".json")
231 LOG.info(config_file)
232 env = Builder(self.chanl, config_file)
235 taskid = self._create_task(scenario)
239 casetag = case['case']
241 protocol = case['protocol']
242 profile = case['profile']
244 sizes = case['sizes']
246 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
251 flow_producer.create(scenario, casetag)
252 result = perf.run(tool, protocol, ttype, sizes, affctl)
255 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
258 self.dbconn.add_data_2test(testid, result)
261 self.report(rpath, not mail_on, taskid)
263 info_str = "do batch perf test successfully"
266 def collect_host_info(self, target):
267 if self.collection is not None:
268 return self.collection.collect_host_info(target)
270 return "collection is None"
273 class Manager(daemon.Daemon):
276 The manager will create a socket for vstfadm.
277 also the manager own a ops chains
279 super(Manager, self).__init__(cst.vstf_pid)
280 # the connection of socket
282 # the operations of manager
284 # record the daemon run flag
287 def deal_unknown_obj(self, obj):
288 return "unknown response %s" % obj
291 signal.signal(signal.SIGTERM, self.daemon_die)
292 # setup the socket server for communicating with vstfadm
294 self.conn = unix.UdpServer()
297 except Exception as e:
300 # accept the connection of vstfadm and recv the command
301 # run the command from vstfadm and return the response
303 conn, addr = self.conn.accept()
304 LOG.debug("accept the conn: %(conn)s", {'conn': conn})
306 # recv the msg until the conn break.
310 data = message.recv(conn.recv)
311 LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
312 msg = message.decode(data)
313 body = message.get_body(msg)
314 context = message.get_context(msg)
316 LOG.debug("manage catch the connection close!")
318 except Exception as e:
319 LOG.error("Manager recv message from socket failed.")
324 func = getattr(self.ops, body.get('method'))
325 LOG.info("Call function:%s, args:%s",
326 func.__name__, body.get('args'))
327 response = func(**body.get('args'))
328 LOG.info("response: %s", response)
329 except excepts.UnsolvableExit as e:
330 msg = "The manager opps, exit now"
332 # the manager has no need to be continue, just return
333 # this msg and exit normal
336 except Exception as e:
337 # here just the function failed no need exit, just return the msg
338 msg = "Run function failed. [ %s ]" % (e)
342 response = message.add_context(response, **context)
343 LOG.debug("Manager send the response: <%(r)s", {'r': response})
344 message.send(conn.send, message.encode(response))
345 except Exception as e:
348 # close the connection when conn down
351 def daemon_die(self, signum, frame):
352 """overwrite daemon.Daemon.daemon_die(self)"""
353 LOG.info("manage catch the signal %s to exit." % signum)
355 # we can not close the conn direct, just tell manager to stop accept
356 self.run_flag = False
359 # stop the ops's proxy
360 # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
361 # this a know bug in pika. fix in 0.9.14 release
363 self.ops.chanl.close()
364 except AttributeError:
365 LOG.warning("The connection close happens attribute error")
367 def start_manage(self, monitor="localhost", port=5672):
369 # create manager's ops chains here will create a proxy to rabbitmq
370 self.ops = OpsChains(monitor, port)
371 except Exception as e:
375 def stop_manage(self):
379 @cliutil.arg("--monitor",
383 help="which ip to be monitored")
384 @cliutil.arg("--port",
388 help="rabbitmq conn server")
390 Manager().start_manage(args.monitor, args.port)
394 Manager().stop_manage()
398 """this is for vstfctl"""
399 setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
400 parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
401 parser.set_subcommand_parser(target=sys.modules[__name__])
402 args = parser.parse_args()