JIRA: BOTTLENECKS-29
[bottlenecks.git] / vstf / vstf / controller / api_server.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import uuid
12 import time
13 import os
14 import sys
15 import logging
16 import signal
17 import json
18
19 from vstf.common import unix, message, cliutil, excepts
20 from vstf.common.vstfcli import VstfParser
21 from vstf.common.log import setup_logging
22 from vstf.common import daemon
23 from vstf.controller.fabricant import Fabricant
24 from vstf.agent.env.basic.commandline import CommandLine
25 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
26 from vstf.controller.env_build.env_collect import EnvCollectApi
27 from vstf.controller.database.dbinterface import DbManage
28 import vstf.controller.sw_perf.performance as pf
29 from vstf.controller.settings.tester_settings import TesterSettings
30 from vstf.controller.settings.device_settings import DeviceSettings
31 from vstf.controller.settings.flows_settings import FlowsSettings
32 from vstf.controller.settings.mail_settings import MailSettings
33 from vstf.controller.settings.tool_settings import ToolSettings
34 from vstf.controller.settings.perf_settings import PerfSettings
35 from vstf.controller.sw_perf.perf_provider import PerfProvider
36 from vstf.controller.sw_perf.flow_producer import FlowsProducer
37 from vstf.controller.settings.forwarding_settings import ForwardingSettings
38 import vstf.controller.reporters.reporter as rp
39 import vstf.rpc_frame_work.rpc_producer as rpc
40 import vstf.common.constants as cst
41 import vstf.common.check as chk
42
43 LOG = logging.getLogger(__name__)
44 cmd = CommandLine()
45
46
47 class OpsChains(object):
48     def __init__(self, monitor, port):
49         """The ops chains will setup the proxy to rabbitmq
50         and setup a thread to watch the queues of rabbitmq
51         
52         """
53         LOG.info("VSTF Manager start to listen to %s", monitor)
54         if not os.path.exists(cst.VSTFCPATH):
55             os.mkdir(cst.VSTFCPATH)
56
57         self.chanl = rpc.Server(host=monitor, port=port)
58         self.dbconn = DbManage()
59         self.collection = EnvCollectApi(self.chanl)
60
61     def list_devs(self, **kwargs):
62         target = kwargs.get('host')
63         if not target:
64             respond = "the target is empty, not support now."
65         else:
66             respond = self.chanl.call(self.chanl.make_msg("list_nic_devices"), target)
67         return respond
68
69     def src_install(self, host, config_file):
70         if not os.path.exists(config_file):
71             raise Exception("Can not found the config file.")
72         cfg = json.load(open(config_file))
73         msg = self.chanl.make_msg("src_install", cfg=cfg)
74         return self.chanl.call(msg, host, timeout=1000)
75
76     def create_images(self, host, config_file):
77         if not os.path.exists(config_file):
78             raise Exception("Can not found the config file.")
79         cfg = json.load(open(config_file))
80         msg = self.chanl.make_msg("create_images", cfg=cfg)
81         return self.chanl.call(msg, host, timeout=1000)
82
83     def clean_images(self, host, config_file):
84         if not os.path.exists(config_file):
85             raise Exception("Can not found the config file.")
86         cfg = json.load(open(config_file))
87         msg = self.chanl.make_msg("clean_images", cfg=cfg)
88         return self.chanl.call(msg, host, timeout=1000)
89
90     def apply_model(self, host, model=None, config_file=None):
91         if config_file is None:
92             config_file = "/etc/vstf/env/%s.json" % model
93         if not os.path.exists(config_file):
94             raise Exception("Can not found the config file.")
95         env = Builder(self.chanl, config_file)
96         ret = env.build()
97         return ret
98
99     def disapply_model(self, host, model=None, config_file=None):
100         if config_file is None:
101             config_file = "/etc/vstf/env/%s.json" % model
102         if not os.path.exists(config_file):
103             raise Exception("Can not found the config file.")
104         env = Builder(self.chanl, config_file)
105         ret = env.clean()
106         return ret
107
108     def list_tasks(self):
109         ret = self.dbconn.query_tasks()
110         head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
111         if ret:
112             ret = head + ret
113         return ret
114
115     def affctl_list(self, host):
116         if not host:
117             return "Need input the host"
118         return Fabricant(host, self.chanl).affctl_list()
119
120     def _create_task(self, scenario):
121         taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(cst.TIME_FORMAT),
122                                          desc=scenario + "Test")
123         LOG.info("new Task id:%s" % taskid)
124         if -1 == taskid:
125             raise Exception("DB create task failed.")
126
127         device = DeviceSettings().settings
128         hosts = [device["host"], device["tester"]]
129         for host in hosts:
130             LOG.info(host)
131
132             devs = host["devs"][0]
133             keys = ["bdf", "iface", "mac"]
134             key = devs.keys()[0]
135             if key in keys:
136                 name = devs[key]
137             else:
138                 raise Exception("error devs :%s", devs)
139
140             query = Fabricant(host["agent"], self.chanl)
141             nic_info = query.get_device_detail(identity=name)
142
143             LOG.info(nic_info)
144
145             os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host["agent"])
146             LOG.info(os_info)
147             LOG.info(cpu_info)
148             LOG.info(mem_info)
149             LOG.info(hw_info)
150
151             self.dbconn.add_host_2task(taskid,
152                                        host["agent"],
153                                        json.dumps(hw_info[cst.HW_INFO]),
154                                        json.dumps(cpu_info[cst.CPU_INFO]),
155                                        json.dumps(mem_info[cst.MEMORY_INFO]),
156                                        nic_info["desc"],
157                                        json.dumps(os_info[cst.OS_INFO]))
158
159         self.dbconn.add_extent_2task(taskid, "ixgbe", "driver", "")
160         self.dbconn.add_extent_2task(taskid, "OVS", "switch", "")
161         return taskid
162
163     def settings(self, head, tail):
164
165         forward_settings = ForwardingSettings()
166         head_d = {
167             "ip": head,
168             "namespace":forward_settings.settings["head"]["namespace"]
169         }
170         tail_d = {
171             "ip": tail,
172             "namespace":forward_settings.settings["tail"]["namespace"]
173         }
174         LOG.info(head_d)
175         LOG.info(tail_d)
176         forward_settings.set_head(head_d)
177         forward_settings.set_tail(tail_d)
178
179     def report(self, rpath='./', mail_off=False, taskid=-1):
180         report = rp.Report(self.dbconn, rpath)
181         if taskid == -1:
182             taskid = self.dbconn.get_last_taskid()
183         report.report(taskid, mail_off)
184         info_str = "do report over"
185         return info_str
186
187     def run_perf_cmd(self, case, rpath='./', affctl=False, build_on=False, save_on=False, report_on=False,
188                      mail_on=False):
189         LOG.info(case)
190         LOG.info("build_on:%s report_on:%s mail_on:%s" % (build_on, report_on, mail_on))
191         casetag = case['case']
192         tool = case['tool']
193         protocol = case['protocol']
194         switch = "ovs"
195         provider = None
196         ttype = case['type']
197         sizes = case['sizes']
198
199         ret, ret_str = chk.check_case_params(protocol, ttype, tool)
200         if not ret:
201             return ret_str
202
203         scenario = self.dbconn.query_scenario(casetag)
204         LOG.info(scenario)
205         if not scenario:
206             LOG.warn("not support the case:%s", casetag)
207             return
208
209         config_file = os.path.join("/etc/vstf/env", scenario + ".json")
210
211         LOG.info(config_file)
212         env = Builder(self.chanl, config_file)
213         if build_on:
214             env.build()
215         flows_settings = FlowsSettings()
216         tool_settings = ToolSettings()
217         tester_settings = TesterSettings()
218         flow_producer = FlowsProducer(self.chanl, flows_settings)
219         provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
220
221         perf = pf.Performance(self.chanl, provider)
222         flow_producer.create(scenario, casetag)
223         result = perf.run(tool, protocol, ttype, sizes, affctl)
224         LOG.info(flows_settings.settings)
225         LOG.info(result)
226         if save_on:
227             taskid = self._create_task(scenario)
228             testid = self.dbconn.add_test_2task(taskid, casetag, protocol, ttype, switch, provider, tool)
229             LOG.info(testid)
230             self.dbconn.add_data_2test(testid, result)
231             if report_on:
232                 self.report(rpath, not mail_on, taskid)
233         return result
234
235     def run_perf_file(self, rpath='./', affctl=False, report_on=True, mail_on=True):
236         perf_settings = PerfSettings()
237         flows_settings = FlowsSettings()
238         tool_settings = ToolSettings()
239         tester_settings = TesterSettings()
240         flow_producer = FlowsProducer(self.chanl, flows_settings)
241         provider = PerfProvider(flows_settings.settings, tool_settings.settings, tester_settings.settings)
242         perf = pf.Performance(self.chanl, provider)
243         tests = perf_settings.settings
244
245         for scenario, cases in tests.items():
246             LOG.info(scenario)
247             if not cases:
248                 continue
249
250             config_file = os.path.join("/etc/vstf/env", scenario + ".json")
251
252             LOG.info(config_file)
253             env = Builder(self.chanl, config_file)
254             env.build()
255
256             taskid = self._create_task(scenario)
257
258             for case in cases:
259                 LOG.info(case)
260                 casetag = case['case']
261                 tool = case['tool']
262                 protocol = case['protocol']
263                 provider = None
264                 switch = "ovs"
265                 ttype = case['type']
266                 sizes = case['sizes']
267
268                 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
269                 if not ret:
270                     LOG.warn(ret_str)
271                     continue
272
273                 flow_producer.create(scenario, casetag)
274                 result = perf.run(tool, protocol, ttype, sizes, affctl)
275                 LOG.info(result)
276
277                 testid = self.dbconn.add_test_2task(taskid, casetag, protocol, ttype, switch, provider, tool)
278                 LOG.info(testid)
279
280                 self.dbconn.add_data_2test(testid, result)
281
282             if report_on:
283                 self.report(rpath, not mail_on, taskid)
284
285         info_str = "do batch perf test successfully"
286         return info_str
287
288     def collect_host_info(self, target):
289         if self.collection is not None:
290             return self.collection.collect_host_info(target)
291         else:
292             return "collection is None"
293
294
295 class Manager(daemon.Daemon):
296     def __init__(self):
297         """
298         The manager will create a socket for vstfadm.
299         also the manager own a ops chains
300         """
301         super(Manager, self).__init__(cst.vstf_pid)
302         # the connection of socket
303         self.conn = None
304         # the operations of manager
305         self.ops = None
306         # record the daemon run flag
307         self.run_flag = True
308
309     def deal_unknown_obj(self, obj):
310         return "unknown response %s:%s" % (self, obj)
311
312     def run(self):
313         signal.signal(signal.SIGTERM, self.daemon_die)
314         # setup the socket server for communicating with vstfadm
315         try:
316             self.conn = unix.UdpServer()
317             self.conn.bind()
318             self.conn.listen()
319         except Exception as e:
320             raise e
321
322         # accept the connection of vstfadm and recv the command
323         # run the command from vstfadm and return the response
324         while self.run_flag:
325             conn, addr = self.conn.accept()
326             LOG.debug("accept the conn: %(conn)s", {'conn': conn})
327
328             # recv the msg until the conn break.
329
330             while True:
331                 try:
332                     data = message.recv(conn.recv)
333                     LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
334                     msg = message.decode(data)
335                     body = message.get_body(msg)
336                     context = message.get_context(msg)
337                 except RuntimeError:
338                     LOG.debug("manage catch the connection close!")
339                     break
340                 except Exception as e:
341                     LOG.error("Manager recv message from socket failed.")
342                     self.daemon_die()
343                     raise e
344
345                 try:
346                     func = getattr(self.ops, body.get('method'))
347                     LOG.info("Call function:%s, args:%s",
348                              func.__name__, body.get('args'))
349                     response = func(**body.get('args'))
350                     LOG.info("response: %s", response)
351                 except excepts.UnsolvableExit as e:
352                     msg = "The manager opps, exit now"
353                     LOG.error(msg)
354                     # the manager has no need to be continue, just return
355                     # this msg and exit normal
356                     self.daemon_die()
357                     raise e
358                 except Exception as e:
359                     # here just the function failed no need exit, just return the msg
360                     msg = "Run function failed. [ %s ]" % (e)
361                     response = msg
362                     LOG.error(msg)
363                 try:
364                     response = message.add_context(response, **context)
365                     LOG.debug("Manager send the response: <%(r)s", {'r': response})
366                     message.send(conn.send, message.encode(response))
367                 except Exception as e:
368                     self.daemon_die()
369                     raise e
370             # close the connection when conn down
371             conn.close()
372
373     def daemon_die(self, signum, frame):
374         """overwrite daemon.Daemon.daemon_die(self)"""
375         LOG.info("manage catch the signal %s to exit." % signum)
376         if self.conn:
377             # we can not close the conn direct, just tell manager to stop accept
378             self.run_flag = False
379
380         if self.ops:
381             # stop the ops's proxy
382             # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
383             # this a know bug in pika. fix in 0.9.14 release
384             try:
385                 self.ops.chanl.close()
386             except AttributeError:
387                 LOG.warning("The connection close happens attribute error")
388
389     def start_manage(self, monitor="localhost", port=5672):
390         try:
391             # create manager's ops chains here will create a proxy to rabbitmq
392             self.ops = OpsChains(monitor, port)
393         except Exception as e:
394             raise e
395         self.start()
396
397     def stop_manage(self):
398         self.stop()
399
400
401 @cliutil.arg("--monitor",
402              dest="monitor",
403              default="localhost",
404              action="store",
405              help="which ip to be monitored")
406 @cliutil.arg("--port",
407              dest="port",
408              default="5672",
409              action="store",
410              help="rabbitmq conn server")
411 def do_start(args):
412     Manager().start_manage(args.monitor, args.port)
413
414
415 def do_stop(args):
416     Manager().stop_manage()
417
418
419 def main():
420     """this is for vstfctl"""
421     setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-manager.log", clevel=logging.INFO)
422     parser = VstfParser(prog="vstf-manager", description="vstf manager command line")
423     parser.set_subcommand_parser(target=sys.modules[__name__])
424     args = parser.parse_args()
425     args.func(args)