d3547011abbb555dc508ebfb9f714edd75c557ff
[bottlenecks.git] / vstf / vstf / controller / api_server.py
1 import uuid
2 import time
3 import os
4 import sys
5 import logging
6 import signal
7 import json
8
9 from vstf.common import unix, message, cliutil, excepts
10 from vstf.common.vstfcli import VstfParser
11 from vstf.common.log import setup_logging
12 from vstf.common import daemon
13 from vstf.rpc_frame_work import rpc_producer
14 from vstf.controller.fabricant import Fabricant
15 from vstf.agent.env.basic.commandline import CommandLine
16 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
17 from vstf.controller.env_build.env_collect import EnvCollectApi
18 from vstf.controller.database.dbinterface import DbManage
19 import vstf.controller.sw_perf.performance as pf
20 from vstf.controller.settings.tester_settings import TesterSettings
21 from vstf.controller.settings.device_settings import DeviceSettings
22 from vstf.controller.settings.flows_settings import FlowsSettings
23 from vstf.controller.settings.mail_settings import MailSettings
24 from vstf.controller.settings.tool_settings import ToolSettings
25 from vstf.controller.settings.perf_settings import PerfSettings
26 from vstf.controller.sw_perf.perf_provider import PerfProvider
27 from vstf.controller.sw_perf.flow_producer import FlowsProducer
28 import vstf.controller.reporters.reporter as rp
29 import vstf.common.constants as cst
30 import vstf.common.check as chk
31
32 LOG = logging.getLogger(__name__)
33 cmd = CommandLine()
34
35
36 class OpsChains(object):
37     def __init__(self, monitor, port):
38         """The ops chains will setup the proxy to rabbitmq
39         and setup a thread to watch the queues of rabbitmq
40         
41         """
42         super(OpsChains, self).__init__()
43         if not os.path.exists(cst.VSTFCPATH):
44             os.mkdir(cst.VSTFCPATH)
45
46         LOG.info("VSTF Manager start to listen to %s", monitor)
47         self.chanl = rpc_producer.Server(host=monitor, port=port)
48         self.dbconn = DbManage()
49         self.collection = EnvCollectApi(self.chanl)
50
51     def list_devs(self, **kwargs):
52         target = kwargs.get('host')
53         if not target:
54             respond = "the target is empty, not support now."
55         else:
56             respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
57         return respond
58
59     def src_install(self, host, config_file):
60         if not os.path.exists(config_file):
61             raise Exception("Can not found the config file.")
62         cfg = json.load(open(config_file))
63         msg = self.chanl.make_msg("src_install", cfg=cfg)
64         return self.chanl.call(msg, host, timeout=1000)
65
66     def create_images(self, host, config_file):
67         if not os.path.exists(config_file):
68             raise Exception("Can not found the config file.")
69         cfg = json.load(open(config_file))
70         msg = self.chanl.make_msg("create_images", cfg=cfg)
71         return self.chanl.call(msg, host, timeout=1000)
72
73     def clean_images(self, host, config_file):
74         if not os.path.exists(config_file):
75             raise Exception("Can not found the config file.")
76         cfg = json.load(open(config_file))
77         msg = self.chanl.make_msg("clean_images", cfg=cfg)
78         return self.chanl.call(msg, host, timeout=1000)
79
80     def apply_model(self, host, model=None, config_file=None):
81         if config_file is None:
82             config_file = "/etc/vstf/env/%s.json" % model
83         if not os.path.exists(config_file):
84             raise Exception("Can not found the config file.")
85         env = Builder(self.chanl, config_file)
86         ret = env.build()
87         return ret
88
89     def disapply_model(self, host, model=None, config_file=None):
90         if config_file is None:
91             config_file = "/etc/vstf/env/%s.json" % model
92         if not os.path.exists(config_file):
93             raise Exception("Can not found the config file.")
94         env = Builder(self.chanl, config_file)
95         ret = env.clean()
96         return ret
97
98     def list_tasks(self):
99         ret = self.dbconn.query_tasks()
100         head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
101         if ret:
102             ret = head + ret
103         return ret
104
105     def affctl_list(self, host):
106         if not host:
107             return "Need input the host"
108         return Fabricant(host, self.chanl).affctl_list()
109
110     def _create_task(self, scenario):
111         taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
112                                          desc=scenario + "Test")
113         LOG.info("new Task id:%s" % taskid)
114         if -1 == taskid:
115             raise Exception("DB create task failed.")
116
117         device = DeviceSettings().settings
118         hosts = [device["host"], device["tester"]]
119         for host in hosts:
120             LOG.info(host)
121
122             devs = host["devs"][0]
123             keys = ["bdf", "iface", "mac"]
124             key = devs.keys()[0]
125             if key in keys:
126                 name = devs[key]
127             else:
128                 raise Exception("error devs :%s", devs)
129
130             query = Fabricant(host["agent"], self.chanl)
131             nic_info = query.get_device_detail(identity=name)
132
133             LOG.info(nic_info)
134
135             os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
136             LOG.info(os_info)
137             LOG.info(cpu_info)
138             LOG.info(mem_info)
139             LOG.info(hw_info)
140
141             self.dbconn.add_host_2task(taskid,
142                                        host["agent"],
143                                        json.dumps(hw_info[cst.HW_INFO]),
144                                        json.dumps(cpu_info[cst.CPU_INFO]),
145                                        json.dumps(mem_info[cst.MEMORY_INFO]),
146                                        nic_info["desc"],
147                                        json.dumps(os_info[cst.OS_INFO]))
148
149         self.dbconn.add_extent_2task(taskid, "CETH", "driver", "version 2.0")
150         self.dbconn.add_extent_2task(taskid, "EVS", "switch", "version 3.0")
151         return taskid
152
153     def settings(self, mail=False, perf=False):
154         LOG.info("mail:%s, perf:%s" % (mail, perf))
155         if mail:
156             MailSettings().input()
157         if perf:
158             PerfSettings().input()
159
160     def report(self, rpath='./', mail_off=False, taskid=-1):
161         report = rp.Report(self.dbconn, rpath)
162         if taskid == -1:
163             taskid = self.dbconn.get_last_taskid()
164         report.report(taskid, mail_off)
165         info_str = "do report over"
166         return info_str
167
168     def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False, mail_on=False):
169         LOG.info(case)
170         LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
171         casetag = case['case']
172         tool = case['tool']
173         protocol = case['protocol']
174         profile = case['profile']
175         ttype = case['type']
176         sizes = case['sizes']
177
178         ret, ret_str = chk.check_case_params(protocol, ttype, tool)
179         if not ret:
180             return ret_str
181
182         scenario = self.dbconn.query_scenario(casetag)
183         LOG.info(scenario)
184         if not scenario:
185             LOG.warn("not support the case:%s", casetag)
186             return
187
188         config_file = os.path.join("/etc/vstf/env", scenario + ".json")
189
190         LOG.info(config_file)
191         env = Builder(self.chanl, config_file)
192         if build_on:
193             env.build()
194         flows_settings = FlowsSettings()
195         tool_settings = ToolSettings()
196         tester_settings = TesterSettings()
197         flow_producer = FlowsProducer(self.chanl, flows_settings)
198         provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
199
200         perf = pf.Performance(self.chanl, provider)
201         flow_producer.create(scenario, casetag)
202         result = perf.run(tool, protocol, ttype, sizes, affctl)
203         LOG.info(flows_settings.settings)
204         LOG.info(result)
205         if save_on:
206             taskid = self._create_task(scenario)
207             testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
208             LOG.info(testid)
209             self.dbconn.add_data_2test(testid, result)
210             if report_on:
211                 self.report(rpath, not mail_on, taskid)
212         return result
213
214     def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
215         perf_settings = PerfSettings()
216         flows_settings = FlowsSettings()
217         tool_settings = ToolSettings()
218         tester_settings = TesterSettings()
219         flow_producer = FlowsProducer(self.chanl, flows_settings)
220         provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
221         perf = pf.Performance(self.chanl, provider)
222         tests = perf_settings.settings
223
224         for scenario, cases in tests.items():
225             LOG.info(scenario)
226             if not cases:
227                 continue
228
229             config_file = os.path.join("/etc/vstf/env", scenario + ".json")
230
231             LOG.info(config_file)
232             env = Builder(self.chanl, config_file)
233             env.build()
234
235             taskid = self._create_task(scenario)
236
237             for case in cases:
238                 LOG.info(case)
239                 casetag = case['case']
240                 tool = case['tool']
241                 protocol = case['protocol']
242                 profile = case['profile']
243                 ttype = case['type']
244                 sizes = case['sizes']
245
246                 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
247                 if not ret:
248                     LOG.warn(ret_str)
249                     continue
250
251                 flow_producer.create(scenario, casetag)
252                 result = perf.run(tool, protocol, ttype, sizes, affctl)
253                 LOG.info(result)
254
255                 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, profile, ttype, tool)
256                 LOG.info(testid)
257
258                 self.dbconn.add_data_2test(testid, result)
259
260             if report_on:
261                 self.report(rpath, not mail_on, taskid)
262
263         info_str = "do batch perf test successfully"
264         return info_str
265
266     def collect_host_info(self, target):
267         if self.collection is not None:
268             return self.collection.collect_host_info(target)
269         else:
270             return "collection is None"
271
272
273 class Manager(daemon.Daemon):
274     def __init__(self):
275         """
276         The manager will create a socket for vstfadm.
277         also the manager own a ops chains
278         """
279         super(Manager, self).__init__(cst.vstf_pid)
280         # the connection of socket
281         self.conn = None
282         # the operations of manager
283         self.ops = None
284         # record the daemon run flag
285         self.run_flag = True
286
287     def deal_unknown_obj(self, obj):
288         return "unknown response %s" % obj
289
290     def run(self):
291         signal.signal(signal.SIGTERM, self.daemon_die)
292         # setup the socket server for communicating with vstfadm
293         try:
294             self.conn = unix.UdpServer()
295             self.conn.bind()
296             self.conn.listen()
297         except Exception as e:
298             raise e
299
300         # accept the connection of vstfadm and recv the command
301         # run the command from vstfadm and return the response
302         while self.run_flag:
303             conn, addr = self.conn.accept()
304             LOG.debug("accept the conn: %(conn)s", {'conn': conn})
305
306             # recv the msg until the conn break.
307
308             while True:
309                 try:
310                     data = message.recv(conn.recv)
311                     LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
312                     msg = message.decode(data)
313                     body = message.get_body(msg)
314                     context = message.get_context(msg)
315                 except RuntimeError:
316                     LOG.debug("manage catch the connection close!")
317                     break
318                 except Exception as e:
319                     LOG.error("Manager recv message from socket failed.")
320                     self.daemon_die()
321                     raise e
322
323                 try:
324                     func = getattr(self.ops, body.get('method'))
325                     LOG.info("Call function:%s, args:%s",
326                              func.__name__, body.get('args'))
327                     response = func(**body.get('args'))
328                     LOG.info("response: %s", response)
329                 except excepts.UnsolvableExit as e:
330                     msg = "The manager opps, exit now"
331                     LOG.error(msg)
332                     # the manager has no need to be continue, just return
333                     # this msg and exit normal
334                     self.daemon_die()
335                     raise e
336                 except Exception as e:
337                     # here just the function failed no need exit, just return the msg
338                     msg = "Run function failed. [ %s ]" % (e)
339                     response = msg
340                     LOG.error(msg)
341                 try:
342                     response = message.add_context(response, **context)
343                     LOG.debug("Manager send the response: <%(r)s", {'r': response})
344                     message.send(conn.send, message.encode(response))
345                 except Exception as e:
346                     self.daemon_die()
347                     raise e
348             # close the connection when conn down
349             conn.close()
350
351     def daemon_die(self, signum, frame):
352         """overwrite daemon.Daemon.daemon_die(self)"""
353         LOG.info("manage catch the signal %s to exit." % signum)
354         if self.conn:
355             # we can not close the conn direct, just tell manager to stop accept
356             self.run_flag = False
357
358         if self.ops:
359             # stop the ops's proxy
360             # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
361             # this a know bug in pika. fix in 0.9.14 release
362             try:
363                 self.ops.chanl.close()
364             except AttributeError:
365                 LOG.warning("The connection close happens attribute error")
366
367     def start_manage(self, monitor="localhost", port=5672):
368         try:
369             # create manager's ops chains here will create a proxy to rabbitmq
370             self.ops = OpsChains(monitor, port)
371         except Exception as e:
372             raise e
373         self.start()
374
375     def stop_manage(self):
376         self.stop()
377
378
379 @cliutil.arg("--monitor",
380              dest="monitor",
381              default="localhost",
382              action="store",
383              help="which ip to be monitored")
384 @cliutil.arg("--port",
385              dest="port",
386              default="5672",
387              action="store",
388              help="rabbitmq conn server")
389 def do_start(args):
390     Manager().start_manage(args.monitor, args.port)
391
392
393 def do_stop(args):
394     Manager().stop_manage()
395
396
397 def main():
398     """this is for vstfctl"""
399     setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
400     parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
401     parser.set_subcommand_parser(target=sys.modules[__name__])
402     args = parser.parse_args()
403     args.func(args)