5 # see license for license details
9 from vstf.controller.settings.device_settings import DeviceSettings
10 from vstf.controller.settings.forwarding_settings import ForwardingSettings
11 from vstf.controller.settings.cpu_settings import CpuSettings
12 from vstf.controller.fabricant import Fabricant
13 from vstf.controller.settings.flows_settings import FlowsSettings
14 import vstf.common.constants as cst
16 LOG = logging.getLogger(__name__)
19 class FlowsProducer(object):
20 def __init__(self, conn, flows_settings):
21 self._perf = flows_settings
22 self._forwarding = ForwardingSettings().settings
23 self._device = DeviceSettings().settings
24 self._cpu = CpuSettings().settings
28 def get_dev(self, item):
29 agent = self._device[item[0]]["agent"]
30 devs = self._device[item[0]]["devs"][item[1]]
32 keys = ["bdf", "iface", "mac"]
39 raise Exception("error devs :%s", devs)
42 if not self._devs_map.has_key((agent, name)):
43 query = Fabricant(agent, self._conn)
44 query.clean_all_namespace()
45 dev_info = query.get_device_verbose(identity=name)
46 if not isinstance(dev_info, dict):
47 err = "get device detail failed, agent:%s net:%s" % (agent, name)
52 "bdf": dev_info["bdf"],
53 "iface": dev_info["iface"],
54 "mac": dev_info["mac"],
60 self._devs_map[(agent, name)] = dev
63 return self._devs_map[(agent, name)]
67 "agent": self._device["host"]["agent"],
68 "affctl": self._cpu["affctl"]
72 def create(self, scenario, case):
74 flows_indexes = self._forwarding[scenario]["flows"]
76 for index in flows_indexes:
78 raise Exception("error flows %s" % flows_indexes)
79 dev = self.get_dev(index)
80 flows_infos.append(dev)
82 flows_infos[0]['dev'].update(self._forwarding["head"])
83 flows_infos[-1]['dev'].update(self._forwarding["tail"])
87 actor_info = cst.CASE_ACTOR_MAP[case]
89 self._perf.clear_all()
90 senders = actor_info["senders"]
92 for sender in senders:
93 dev = flows_infos[sender]
95 self._perf.add_senders(dev)
97 receivers = actor_info["receivers"]
98 for receiver in receivers:
99 dev = flows_infos[receiver]
101 self._perf.add_receivers(dev)
103 watchers = self._forwarding[scenario]["watchers"]
104 for watcher in watchers:
105 dev = flows_infos[watcher]
107 self._perf.add_watchers(dev)
110 for namespace in namespaces:
111 dev = flows_infos[namespace]
113 self._perf.add_namespaces(dev)
115 host = self.get_host()
117 self._perf.add_cpu_listens(host)
119 self._perf.set_flows(actor_info["flows"])
124 from vstf.rpc_frame_work.rpc_producer import Server
125 from vstf.common.log import setup_logging
126 setup_logging(level=logging.INFO, log_file="/var/log/vstf/vstf-producer.log", clevel=logging.INFO)
128 conn = Server("192.168.188.10")
129 flow_settings = FlowsSettings()
130 flow_producer = FlowsProducer(conn, flow_settings)
133 flow_producer.create(scenario, case)
136 if __name__ == '__main__':