2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.openflow.controller.impl;
18 import com.google.common.base.Strings;
19 import com.google.common.collect.ArrayListMultimap;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Multimap;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Modified;
26 import org.apache.felix.scr.annotations.Property;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onosproject.cfg.ComponentConfigService;
31 import org.onosproject.net.driver.DefaultDriverProviderService;
32 import org.onosproject.net.driver.DriverService;
33 import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
34 import org.onosproject.openflow.controller.Dpid;
35 import org.onosproject.openflow.controller.OpenFlowController;
36 import org.onosproject.openflow.controller.OpenFlowEventListener;
37 import org.onosproject.openflow.controller.OpenFlowPacketContext;
38 import org.onosproject.openflow.controller.OpenFlowSwitch;
39 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
40 import org.onosproject.openflow.controller.PacketListener;
41 import org.onosproject.openflow.controller.RoleState;
42 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
43 import org.osgi.service.component.ComponentContext;
44 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsEntry;
45 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsReply;
46 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
47 import org.projectfloodlight.openflow.protocol.OFExperimenter;
48 import org.projectfloodlight.openflow.protocol.OFFactories;
49 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
50 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
51 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsEntry;
52 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsReply;
53 import org.projectfloodlight.openflow.protocol.OFGroupStatsEntry;
54 import org.projectfloodlight.openflow.protocol.OFGroupStatsReply;
55 import org.projectfloodlight.openflow.protocol.OFMessage;
56 import org.projectfloodlight.openflow.protocol.OFPacketIn;
57 import org.projectfloodlight.openflow.protocol.OFPortDesc;
58 import org.projectfloodlight.openflow.protocol.OFPortStatsEntry;
59 import org.projectfloodlight.openflow.protocol.OFPortStatsReply;
60 import org.projectfloodlight.openflow.protocol.OFPortStatus;
61 import org.projectfloodlight.openflow.protocol.OFStatsReply;
62 import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
63 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
64 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
68 import java.util.Collection;
69 import java.util.Collections;
70 import java.util.Dictionary;
71 import java.util.HashMap;
72 import java.util.LinkedList;
73 import java.util.List;
76 import java.util.concurrent.ConcurrentHashMap;
77 import java.util.concurrent.CopyOnWriteArraySet;
78 import java.util.concurrent.ExecutorService;
79 import java.util.concurrent.Executors;
80 import java.util.concurrent.locks.Lock;
81 import java.util.concurrent.locks.ReentrantLock;
83 import static org.onlab.util.Tools.get;
84 import static org.onlab.util.Tools.groupedThreads;
86 @Component(immediate = true)
88 public class OpenFlowControllerImpl implements OpenFlowController {
89 private static final int DEFAULT_OFPORT = 6633;
90 private static final int DEFAULT_WORKER_THREADS = 16;
92 private static final Logger log =
93 LoggerFactory.getLogger(OpenFlowControllerImpl.class);
95 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96 protected DriverService driverService;
98 // References exists merely for sequencing purpose to assure drivers are loaded
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected DefaultDriverProviderService defaultDriverProviderService;
102 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103 protected ComponentConfigService cfgService;
105 @Property(name = "openflowPort", intValue = DEFAULT_OFPORT,
106 label = "Port number used by OpenFlow protocol; default is 6633")
107 private int openflowPort = DEFAULT_OFPORT;
109 @Property(name = "workerThreads", intValue = DEFAULT_WORKER_THREADS,
110 label = "Number of controller worker threads; default is 16")
111 private int workerThreads = DEFAULT_WORKER_THREADS;
113 private final ExecutorService executorMsgs =
114 Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
116 private final ExecutorService executorBarrier =
117 Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
119 protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
120 new ConcurrentHashMap<>();
121 protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
122 new ConcurrentHashMap<>();
123 protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
124 new ConcurrentHashMap<>();
126 protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
127 protected Set<OpenFlowSwitchListener> ofSwitchListener = new CopyOnWriteArraySet<>();
129 protected Multimap<Integer, PacketListener> ofPacketListener =
130 ArrayListMultimap.create();
132 protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
134 protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
135 ArrayListMultimap.create();
137 protected Multimap<Dpid, OFGroupStatsEntry> fullGroupStats =
138 ArrayListMultimap.create();
140 protected Multimap<Dpid, OFGroupDescStatsEntry> fullGroupDescStats =
141 ArrayListMultimap.create();
143 protected Multimap<Dpid, OFPortStatsEntry> fullPortStats =
144 ArrayListMultimap.create();
146 private final Controller ctrl = new Controller();
149 public void activate(ComponentContext context) {
150 cfgService.registerProperties(getClass());
151 Map<String, String> properties = readComponentConfiguration(context);
152 ctrl.setConfigParams(properties);
153 ctrl.start(agent, driverService);
157 public void deactivate() {
158 cfgService.unregisterProperties(getClass(), false);
163 * Extracts properties from the component configuration context.
165 * @param context the component context
167 private Map<String, String> readComponentConfiguration(ComponentContext context) {
168 Dictionary<?, ?> properties = context.getProperties();
169 Map<String, String> outProperties = new HashMap<>();
171 String port = get(properties, "openflowPort");
172 if (!Strings.isNullOrEmpty(port)) {
173 outProperties.put("openflowport", port);
176 String thread = get(properties, "workerThreads");
177 if (!Strings.isNullOrEmpty(thread)) {
178 outProperties.put("workerthreads", thread);
181 return outProperties;
185 public void modified(ComponentContext context) {
186 Map<String, String> properties = readComponentConfiguration(context);
188 ctrl.setConfigParams(properties);
189 ctrl.start(agent, driverService);
193 public Iterable<OpenFlowSwitch> getSwitches() {
194 return connectedSwitches.values();
198 public Iterable<OpenFlowSwitch> getMasterSwitches() {
199 return activeMasterSwitches.values();
203 public Iterable<OpenFlowSwitch> getEqualSwitches() {
204 return activeEqualSwitches.values();
208 public OpenFlowSwitch getSwitch(Dpid dpid) {
209 return connectedSwitches.get(dpid);
213 public OpenFlowSwitch getMasterSwitch(Dpid dpid) {
214 return activeMasterSwitches.get(dpid);
218 public OpenFlowSwitch getEqualSwitch(Dpid dpid) {
219 return activeEqualSwitches.get(dpid);
223 public void addListener(OpenFlowSwitchListener listener) {
224 if (!ofSwitchListener.contains(listener)) {
225 this.ofSwitchListener.add(listener);
230 public void removeListener(OpenFlowSwitchListener listener) {
231 this.ofSwitchListener.remove(listener);
235 public void addPacketListener(int priority, PacketListener listener) {
236 ofPacketListener.put(priority, listener);
240 public void removePacketListener(PacketListener listener) {
241 ofPacketListener.values().remove(listener);
245 public void addEventListener(OpenFlowEventListener listener) {
246 ofEventListener.add(listener);
250 public void removeEventListener(OpenFlowEventListener listener) {
251 ofEventListener.remove(listener);
255 public void write(Dpid dpid, OFMessage msg) {
256 this.getSwitch(dpid).sendMsg(msg);
260 public void processPacket(Dpid dpid, OFMessage msg) {
261 Collection<OFFlowStatsEntry> flowStats;
262 Collection<OFGroupStatsEntry> groupStats;
263 Collection<OFGroupDescStatsEntry> groupDescStats;
264 Collection<OFPortStatsEntry> portStats;
266 switch (msg.getType()) {
268 for (OpenFlowSwitchListener l : ofSwitchListener) {
269 l.portChanged(dpid, (OFPortStatus) msg);
273 for (OpenFlowSwitchListener l : ofSwitchListener) {
274 l.switchChanged(dpid);
278 OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
279 .packetContextFromPacketIn(this.getSwitch(dpid),
281 for (PacketListener p : ofPacketListener.values()) {
282 p.handlePacket(pktCtx);
285 // TODO: Consider using separate threadpool for sensitive messages.
286 // ie. Back to back error could cause us to starve.
289 executorMsgs.submit(new OFMessageHandler(dpid, msg));
292 OFStatsReply reply = (OFStatsReply) msg;
293 switch (reply.getStatsType()) {
295 for (OpenFlowSwitchListener l : ofSwitchListener) {
296 l.switchChanged(dpid);
300 flowStats = publishFlowStats(dpid, (OFFlowStatsReply) reply);
301 if (flowStats != null) {
302 OFFlowStatsReply.Builder rep =
303 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
304 rep.setEntries(Lists.newLinkedList(flowStats));
305 executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
309 groupStats = publishGroupStats(dpid, (OFGroupStatsReply) reply);
310 if (groupStats != null) {
311 OFGroupStatsReply.Builder rep =
312 OFFactories.getFactory(msg.getVersion()).buildGroupStatsReply();
313 rep.setEntries(Lists.newLinkedList(groupStats));
314 rep.setXid(reply.getXid());
315 executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
319 groupDescStats = publishGroupDescStats(dpid,
320 (OFGroupDescStatsReply) reply);
321 if (groupDescStats != null) {
322 OFGroupDescStatsReply.Builder rep =
323 OFFactories.getFactory(msg.getVersion()).buildGroupDescStatsReply();
324 rep.setEntries(Lists.newLinkedList(groupDescStats));
325 rep.setXid(reply.getXid());
326 executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
330 executorMsgs.submit(new OFMessageHandler(dpid, reply));
333 executorMsgs.submit(new OFMessageHandler(dpid, reply));
336 if (reply instanceof OFCalientFlowStatsReply) {
337 // Convert Calient flow statistics to regular flow stats
338 // TODO: parse remaining fields such as power levels etc. when we have proper monitoring API
339 OFFlowStatsReply.Builder fsr = getSwitch(dpid).factory().buildFlowStatsReply();
340 List<OFFlowStatsEntry> entries = new LinkedList<>();
341 for (OFCalientFlowStatsEntry entry : ((OFCalientFlowStatsReply) msg).getEntries()) {
343 // Single instruction, i.e., output to port
344 OFActionOutput action = OFFactories
345 .getFactory(msg.getVersion())
348 .setPort(entry.getOutPort())
350 OFInstruction instruction = OFFactories
351 .getFactory(msg.getVersion())
353 .applyActions(Collections.singletonList(action));
354 OFFlowStatsEntry fs = getSwitch(dpid).factory().buildFlowStatsEntry()
355 .setMatch(entry.getMatch())
356 .setTableId(entry.getTableId())
357 .setDurationSec(entry.getDurationSec())
358 .setDurationNsec(entry.getDurationNsec())
359 .setPriority(entry.getPriority())
360 .setIdleTimeout(entry.getIdleTimeout())
361 .setHardTimeout(entry.getHardTimeout())
362 .setFlags(entry.getFlags())
363 .setCookie(entry.getCookie())
364 .setInstructions(Collections.singletonList(instruction))
368 fsr.setEntries(entries);
370 flowStats = publishFlowStats(dpid, fsr.build());
371 if (flowStats != null) {
372 OFFlowStatsReply.Builder rep =
373 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
374 rep.setEntries(Lists.newLinkedList(flowStats));
375 executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
378 executorMsgs.submit(new OFMessageHandler(dpid, reply));
382 log.warn("Discarding unknown stats reply type {}", reply.getStatsType());
387 executorBarrier.submit(new OFMessageHandler(dpid, msg));
390 long experimenter = ((OFExperimenter) msg).getExperimenter();
391 if (experimenter == 0x748771) {
392 // LINC-OE port stats
393 OFCircuitPortStatus circuitPortStatus = (OFCircuitPortStatus) msg;
394 OFPortStatus.Builder portStatus = this.getSwitch(dpid).factory().buildPortStatus();
395 OFPortDesc.Builder portDesc = this.getSwitch(dpid).factory().buildPortDesc();
396 portDesc.setPortNo(circuitPortStatus.getPortNo())
397 .setHwAddr(circuitPortStatus.getHwAddr())
398 .setName(circuitPortStatus.getName())
399 .setConfig(circuitPortStatus.getConfig())
400 .setState(circuitPortStatus.getState());
401 portStatus.setReason(circuitPortStatus.getReason()).setDesc(portDesc.build());
402 for (OpenFlowSwitchListener l : ofSwitchListener) {
403 l.portChanged(dpid, portStatus.build());
406 log.warn("Handling experimenter type {} not yet implemented",
407 ((OFExperimenter) msg).getExperimenter(), msg);
411 log.warn("Handling message type {} not yet implemented {}",
416 private synchronized Collection<OFFlowStatsEntry> publishFlowStats(Dpid dpid,
417 OFFlowStatsReply reply) {
418 //TODO: Get rid of synchronized
419 fullFlowStats.putAll(dpid, reply.getEntries());
420 if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
421 return fullFlowStats.removeAll(dpid);
426 private synchronized Collection<OFGroupStatsEntry> publishGroupStats(Dpid dpid,
427 OFGroupStatsReply reply) {
428 //TODO: Get rid of synchronized
429 fullGroupStats.putAll(dpid, reply.getEntries());
430 if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
431 return fullGroupStats.removeAll(dpid);
436 private synchronized Collection<OFGroupDescStatsEntry> publishGroupDescStats(Dpid dpid,
437 OFGroupDescStatsReply reply) {
438 //TODO: Get rid of synchronized
439 fullGroupDescStats.putAll(dpid, reply.getEntries());
440 if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
441 return fullGroupDescStats.removeAll(dpid);
446 private synchronized Collection<OFPortStatsEntry> publishPortStats(Dpid dpid,
447 OFPortStatsReply reply) {
448 fullPortStats.putAll(dpid, reply.getEntries());
449 if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
450 return fullPortStats.removeAll(dpid);
456 public void setRole(Dpid dpid, RoleState role) {
457 final OpenFlowSwitch sw = getSwitch(dpid);
459 log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
466 * Implementation of an OpenFlow Agent which is responsible for
467 * keeping track of connected switches and the state in which
470 public class OpenFlowSwitchAgent implements OpenFlowAgent {
472 private final Logger log = LoggerFactory.getLogger(OpenFlowSwitchAgent.class);
473 private final Lock switchLock = new ReentrantLock();
476 public boolean addConnectedSwitch(Dpid dpid, OpenFlowSwitch sw) {
478 if (connectedSwitches.get(dpid) != null) {
479 log.error("Trying to add connectedSwitch but found a previous "
480 + "value for dpid: {}", dpid);
483 log.info("Added switch {}", dpid);
484 connectedSwitches.put(dpid, sw);
485 for (OpenFlowSwitchListener l : ofSwitchListener) {
493 public boolean validActivation(Dpid dpid) {
494 if (connectedSwitches.get(dpid) == null) {
495 log.error("Trying to activate switch but is not in "
496 + "connected switches: dpid {}. Aborting ..",
500 if (activeMasterSwitches.get(dpid) != null ||
501 activeEqualSwitches.get(dpid) != null) {
502 log.error("Trying to activate switch but it is already "
503 + "activated: dpid {}. Found in activeMaster: {} "
504 + "Found in activeEqual: {}. Aborting ..", new Object[]{
506 (activeMasterSwitches.get(dpid) == null) ? 'N' : 'Y',
507 (activeEqualSwitches.get(dpid) == null) ? 'N' : 'Y'});
515 public boolean addActivatedMasterSwitch(Dpid dpid, OpenFlowSwitch sw) {
518 if (!validActivation(dpid)) {
521 activeMasterSwitches.put(dpid, sw);
529 public boolean addActivatedEqualSwitch(Dpid dpid, OpenFlowSwitch sw) {
532 if (!validActivation(dpid)) {
535 activeEqualSwitches.put(dpid, sw);
536 log.info("Added Activated EQUAL Switch {}", dpid);
544 public void transitionToMasterSwitch(Dpid dpid) {
547 if (activeMasterSwitches.containsKey(dpid)) {
550 OpenFlowSwitch sw = activeEqualSwitches.remove(dpid);
552 sw = getSwitch(dpid);
554 log.error("Transition to master called on sw {}, but switch "
555 + "was not found in controller-cache", dpid);
559 log.info("Transitioned switch {} to MASTER", dpid);
560 activeMasterSwitches.put(dpid, sw);
568 public void transitionToEqualSwitch(Dpid dpid) {
571 if (activeEqualSwitches.containsKey(dpid)) {
574 OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
576 sw = getSwitch(dpid);
578 log.error("Transition to equal called on sw {}, but switch "
579 + "was not found in controller-cache", dpid);
583 log.info("Transitioned switch {} to EQUAL", dpid);
584 activeEqualSwitches.put(dpid, sw);
592 public void removeConnectedSwitch(Dpid dpid) {
593 connectedSwitches.remove(dpid);
594 OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
596 log.debug("sw was null for {}", dpid);
597 sw = activeEqualSwitches.remove(dpid);
599 for (OpenFlowSwitchListener l : ofSwitchListener) {
600 l.switchRemoved(dpid);
605 public void processMessage(Dpid dpid, OFMessage m) {
606 processPacket(dpid, m);
610 public void returnRoleReply(Dpid dpid, RoleState requested, RoleState response) {
611 for (OpenFlowSwitchListener l : ofSwitchListener) {
612 l.receivedRoleReply(dpid, requested, response);
617 private final class OFMessageHandler implements Runnable {
619 private final OFMessage msg;
620 private final Dpid dpid;
622 public OFMessageHandler(Dpid dpid, OFMessage msg) {
629 for (OpenFlowEventListener listener : ofEventListener) {
630 listener.handleMessage(dpid, msg);