Merge "Change PTL informatin in INFO"
[bottlenecks.git] / testsuites / vstf / vstf_scripts / vstf / controller / api_server.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10
11 import uuid
12 import time
13 import os
14 import sys
15 import logging
16 import signal
17 import json
18
19 from vstf.common import unix, message, cliutil, excepts
20 from vstf.common.vstfcli import VstfParser
21 from vstf.common.log import setup_logging
22 from vstf.common import daemon
23 from vstf.controller.fabricant import Fabricant
24 from vstf.agent.env.basic.commandline import CommandLine
25 from vstf.controller.env_build.env_build import EnvBuildApi as Builder
26 from vstf.controller.env_build.env_collect import EnvCollectApi
27 from vstf.controller.database.dbinterface import DbManage
28 import vstf.controller.sw_perf.performance as pf
29 from vstf.controller.settings.tester_settings import TesterSettings
30 from vstf.controller.settings.device_settings import DeviceSettings
31 from vstf.controller.settings.flows_settings import FlowsSettings
32 from vstf.controller.settings.mail_settings import MailSettings
33 from vstf.controller.settings.tool_settings import ToolSettings
34 from vstf.controller.settings.perf_settings import PerfSettings
35 from vstf.controller.sw_perf.perf_provider import PerfProvider
36 from vstf.controller.sw_perf.flow_producer import FlowsProducer
37 from vstf.controller.settings.forwarding_settings import ForwardingSettings
38 import vstf.controller.reporters.reporter as rp
39 import vstf.rpc_frame_work.rpc_producer as rpc
40 import vstf.common.constants as cst
41 import vstf.common.check as chk
42
43 LOG = logging.getLogger(__name__)
44 cmd = CommandLine()
45
46
47 class OpsChains(object):
48
49     def __init__(self, monitor, port):
50         """The ops chains will setup the proxy to rabbitmq
51         and setup a thread to watch the queues of rabbitmq
52
53         """
54         LOG.info("VSTF Manager start to listen to %s", monitor)
55         if not os.path.exists(cst.VSTFCPATH):
56             os.mkdir(cst.VSTFCPATH)
57
58         self.chanl = rpc.Server(host=monitor, port=port)
59         self.dbconn = DbManage()
60         self.collection = EnvCollectApi(self.chanl)
61
62     def list_devs(self, **kwargs):
63         target = kwargs.get('host')
64         if not target:
65             respond = "the target is empty, not support now."
66         else:
67             respond = self.chanl.call(
68                 self.chanl.make_msg("list_nic_devices"), target)
69         return respond
70
71     def src_install(self, host, config_file):
72         if not os.path.exists(config_file):
73             raise Exception("Can not found the config file.")
74         cfg = json.load(open(config_file))
75         msg = self.chanl.make_msg("src_install", cfg=cfg)
76         return self.chanl.call(msg, host, timeout=1000)
77
78     def create_images(self, host, config_file):
79         if not os.path.exists(config_file):
80             raise Exception("Can not found the config file.")
81         cfg = json.load(open(config_file))
82         msg = self.chanl.make_msg("create_images", cfg=cfg)
83         return self.chanl.call(msg, host, timeout=1000)
84
85     def clean_images(self, host, config_file):
86         if not os.path.exists(config_file):
87             raise Exception("Can not found the config file.")
88         cfg = json.load(open(config_file))
89         msg = self.chanl.make_msg("clean_images", cfg=cfg)
90         return self.chanl.call(msg, host, timeout=1000)
91
92     def apply_model(self, host, model=None, config_file=None):
93         if config_file is None:
94             config_file = "/etc/vstf/env/%s.json" % model
95         if not os.path.exists(config_file):
96             raise Exception("Can not found the config file.")
97         env = Builder(self.chanl, config_file)
98         ret = env.build()
99         return ret
100
101     def disapply_model(self, host, model=None, config_file=None):
102         if config_file is None:
103             config_file = "/etc/vstf/env/%s.json" % model
104         if not os.path.exists(config_file):
105             raise Exception("Can not found the config file.")
106         env = Builder(self.chanl, config_file)
107         ret = env.clean()
108         return ret
109
110     def list_tasks(self):
111         ret = self.dbconn.query_tasks()
112         head = [["Task ID", "Task Name", "Task Date", "Task Remarks"]]
113         if ret:
114             ret = head + ret
115         return ret
116
117     def affctl_list(self, host):
118         if not host:
119             return "Need input the host"
120         return Fabricant(host, self.chanl).affctl_list()
121
122     def _create_task(self, scenario):
123         taskid = self.dbconn.create_task(str(uuid.uuid4()), time.strftime(
124             cst.TIME_FORMAT), desc=scenario + "Test")
125         LOG.info("new Task id:%s" % taskid)
126         if -1 == taskid:
127             raise Exception("DB create task failed.")
128
129         device = DeviceSettings().settings
130         hosts = [device["host"], device["tester"]]
131         for host in hosts:
132             LOG.info(host)
133
134             devs = host["devs"][0]
135             keys = ["bdf", "iface", "mac"]
136             key = devs.keys()[0]
137             if key in keys:
138                 name = devs[key]
139             else:
140                 raise Exception("error devs :%s", devs)
141
142             query = Fabricant(host["agent"], self.chanl)
143             nic_info = query.get_device_detail(identity=name)
144
145             LOG.info(nic_info)
146
147             os_info, cpu_info, mem_info, hw_info = self.collection.collect_host_info(host[
148                                                                                      "agent"])
149             LOG.info(os_info)
150             LOG.info(cpu_info)
151             LOG.info(mem_info)
152             LOG.info(hw_info)
153
154             self.dbconn.add_host_2task(taskid,
155                                        host["agent"],
156                                        json.dumps(hw_info[cst.HW_INFO]),
157                                        json.dumps(cpu_info[cst.CPU_INFO]),
158                                        json.dumps(mem_info[cst.MEMORY_INFO]),
159                                        nic_info["desc"],
160                                        json.dumps(os_info[cst.OS_INFO]))
161
162         self.dbconn.add_extent_2task(taskid, "ixgbe", "driver", "")
163         self.dbconn.add_extent_2task(taskid, "OVS", "switch", "")
164         return taskid
165
166     def settings(self, head, tail):
167
168         forward_settings = ForwardingSettings()
169         head_d = {
170             "ip": head,
171             "namespace": forward_settings.settings["head"]["namespace"]
172         }
173         tail_d = {
174             "ip": tail,
175             "namespace": forward_settings.settings["tail"]["namespace"]
176         }
177         LOG.info(head_d)
178         LOG.info(tail_d)
179         forward_settings.set_head(head_d)
180         forward_settings.set_tail(tail_d)
181
182     def report(self, rpath='./', mail_off=False, taskid=-1):
183         report = rp.Report(self.dbconn, rpath)
184         if taskid == -1:
185             taskid = self.dbconn.get_last_taskid()
186         report.report(taskid, mail_off)
187         info_str = "do report over"
188         return info_str
189
190     def run_perf_cmd(
191             self,
192             case,
193             rpath='./',
194             affctl=False,
195             build_on=False,
196             save_on=False,
197             report_on=False,
198             mail_on=False):
199         LOG.info(case)
200         LOG.info(
201             "build_on:%s report_on:%s mail_on:%s" %
202             (build_on, report_on, mail_on))
203         casetag = case['case']
204         tool = case['tool']
205         protocol = case['protocol']
206         switch = "ovs"
207         provider = None
208         ttype = case['type']
209         sizes = case['sizes']
210
211         ret, ret_str = chk.check_case_params(protocol, ttype, tool)
212         if not ret:
213             return ret_str
214
215         scenario = self.dbconn.query_scenario(casetag)
216         LOG.info(scenario)
217         if not scenario:
218             LOG.warn("not support the case:%s", casetag)
219             return
220
221         config_file = os.path.join("/etc/vstf/env", scenario + ".json")
222
223         LOG.info(config_file)
224         env = Builder(self.chanl, config_file)
225         if build_on:
226             env.build()
227         flows_settings = FlowsSettings()
228         tool_settings = ToolSettings()
229         tester_settings = TesterSettings()
230         flow_producer = FlowsProducer(self.chanl, flows_settings)
231         provider = PerfProvider(
232             flows_settings.settings,
233             tool_settings.settings,
234             tester_settings.settings)
235
236         perf = pf.Performance(self.chanl, provider)
237         flow_producer.create(scenario, casetag)
238         result = perf.run(tool, protocol, ttype, sizes, affctl)
239         LOG.info(flows_settings.settings)
240         LOG.info(result)
241         if save_on:
242             taskid = self._create_task(scenario)
243             testid = self.dbconn.add_test_2task(
244                 taskid, casetag, protocol, ttype, switch, provider, tool)
245             LOG.info(testid)
246             self.dbconn.add_data_2test(testid, result)
247             if report_on:
248                 self.report(rpath, not mail_on, taskid)
249         return result
250
251     def run_perf_file(
252             self,
253             rpath='./',
254             affctl=False,
255             report_on=True,
256             mail_on=True):
257         perf_settings = PerfSettings()
258         flows_settings = FlowsSettings()
259         tool_settings = ToolSettings()
260         tester_settings = TesterSettings()
261         flow_producer = FlowsProducer(self.chanl, flows_settings)
262         provider = PerfProvider(
263             flows_settings.settings,
264             tool_settings.settings,
265             tester_settings.settings)
266         perf = pf.Performance(self.chanl, provider)
267         tests = perf_settings.settings
268
269         for scenario, cases in tests.items():
270             LOG.info(scenario)
271             if not cases:
272                 continue
273
274             config_file = os.path.join("/etc/vstf/env", scenario + ".json")
275
276             LOG.info(config_file)
277             env = Builder(self.chanl, config_file)
278             env.build()
279
280             taskid = self._create_task(scenario)
281
282             for case in cases:
283                 LOG.info(case)
284                 casetag = case['case']
285                 tool = case['tool']
286                 protocol = case['protocol']
287                 provider = None
288                 switch = "ovs"
289                 ttype = case['type']
290                 sizes = case['sizes']
291
292                 ret, ret_str = chk.check_case_params(protocol, ttype, tool)
293                 if not ret:
294                     LOG.warn(ret_str)
295                     continue
296
297                 flow_producer.create(scenario, casetag)
298                 result = perf.run(tool, protocol, ttype, sizes, affctl)
299                 LOG.info(result)
300
301                 testid = self.dbconn.add_test_2task(
302                     taskid, casetag, protocol, ttype, switch, provider, tool)
303                 LOG.info(testid)
304
305                 self.dbconn.add_data_2test(testid, result)
306
307             if report_on:
308                 self.report(rpath, not mail_on, taskid)
309
310         info_str = "do batch perf test successfully"
311         return info_str
312
313     def collect_host_info(self, target):
314         if self.collection is not None:
315             return self.collection.collect_host_info(target)
316         else:
317             return "collection is None"
318
319
320 class Manager(daemon.Daemon):
321
322     def __init__(self):
323         """
324         The manager will create a socket for vstfadm.
325         also the manager own a ops chains
326         """
327         super(Manager, self).__init__(cst.vstf_pid)
328         # the connection of socket
329         self.conn = None
330         # the operations of manager
331         self.ops = None
332         # record the daemon run flag
333         self.run_flag = True
334
335     def deal_unknown_obj(self, obj):
336         return "unknown response %s:%s" % (self, obj)
337
338     def run(self):
339         signal.signal(signal.SIGTERM, self.daemon_die)
340         # setup the socket server for communicating with vstfadm
341         try:
342             self.conn = unix.UdpServer()
343             self.conn.bind()
344             self.conn.listen()
345         except Exception as e:
346             raise e
347
348         # accept the connection of vstfadm and recv the command
349         # run the command from vstfadm and return the response
350         while self.run_flag:
351             conn, addr = self.conn.accept()
352             LOG.debug("accept the conn: %(conn)s", {'conn': conn})
353
354             # recv the msg until the conn break.
355
356             while True:
357                 try:
358                     data = message.recv(conn.recv)
359                     LOG.debug("Manager recv the msg: %(msg)s", {'msg': data})
360                     msg = message.decode(data)
361                     body = message.get_body(msg)
362                     context = message.get_context(msg)
363                 except RuntimeError:
364                     LOG.debug("manage catch the connection close!")
365                     break
366                 except Exception as e:
367                     LOG.error("Manager recv message from socket failed.")
368                     self.daemon_die()
369                     raise e
370
371                 try:
372                     func = getattr(self.ops, body.get('method'))
373                     LOG.info("Call function:%s, args:%s",
374                              func.__name__, body.get('args'))
375                     response = func(**body.get('args'))
376                     LOG.info("response: %s", response)
377                 except excepts.UnsolvableExit as e:
378                     msg = "The manager opps, exit now"
379                     LOG.error(msg)
380                     # the manager has no need to be continue, just return
381                     # this msg and exit normal
382                     self.daemon_die()
383                     raise e
384                 except Exception as e:
385                     # here just the function failed no need exit, just return
386                     # the msg
387                     msg = "Run function failed. [ %s ]" % (e)
388                     response = msg
389                     LOG.error(msg)
390                 try:
391                     response = message.add_context(response, **context)
392                     LOG.debug(
393                         "Manager send the response: <%(r)s", {
394                             'r': response})
395                     message.send(conn.send, message.encode(response))
396                 except Exception as e:
397                     self.daemon_die()
398                     raise e
399             # close the connection when conn down
400             conn.close()
401
402     def daemon_die(self, signum, frame):
403         """overwrite daemon.Daemon.daemon_die(self)"""
404         LOG.info("manage catch the signal %s to exit." % signum)
405         if self.conn:
406             # we can not close the conn direct, just tell manager to stop
407             # accept
408             self.run_flag = False
409
410         if self.ops:
411             # stop the ops's proxy
412             # maybe happen AttributeError: 'BlockingConnection' object has no attribute 'disconnect'
413             # this a know bug in pika. fix in 0.9.14 release
414             try:
415                 self.ops.chanl.close()
416             except AttributeError:
417                 LOG.warning("The connection close happens attribute error")
418
419     def start_manage(self, monitor="localhost", port=5672):
420         try:
421             # create manager's ops chains here will create a proxy to rabbitmq
422             self.ops = OpsChains(monitor, port)
423         except Exception as e:
424             raise e
425         self.start()
426
427     def stop_manage(self):
428         self.stop()
429
430
431 @cliutil.arg("--monitor",
432              dest="monitor",
433              default="localhost",
434              action="store",
435              help="which ip to be monitored")
436 @cliutil.arg("--port",
437              dest="port",
438              default="5672",
439              action="store",
440              help="rabbitmq conn server")
441 def do_start(args):
442     Manager().start_manage(args.monitor, args.port)
443
444
445 def do_stop(args):
446     Manager().stop_manage()
447
448
449 def main():
450     """this is for vstfctl"""
451     setup_logging(
452         level=logging.INFO,
453         log_file="/var/log/vstf/vstf-manager.log",
454         clevel=logging.INFO)
455     parser = VstfParser(
456         prog="vstf-manager",
457         description="vstf manager command line")
458     parser.set_subcommand_parser(target=sys.modules[__name__])
459     args = parser.parse_args()
460     args.func(args)