b97c336284d6047e8a321e36139a60f84d95bf29
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.openflow.controller.impl;
17
18 import com.google.common.collect.ArrayListMultimap;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Multimap;
21 import org.apache.felix.scr.annotations.Activate;
22 import org.apache.felix.scr.annotations.Component;
23 import org.apache.felix.scr.annotations.Deactivate;
24 import org.apache.felix.scr.annotations.Modified;
25 import org.apache.felix.scr.annotations.Property;
26 import org.apache.felix.scr.annotations.Reference;
27 import org.apache.felix.scr.annotations.ReferenceCardinality;
28 import org.apache.felix.scr.annotations.Service;
29 import org.onosproject.cfg.ComponentConfigService;
30 import org.onosproject.net.driver.DefaultDriverProviderService;
31 import org.onosproject.net.driver.DriverService;
32 import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
33 import org.onosproject.openflow.controller.Dpid;
34 import org.onosproject.openflow.controller.OpenFlowController;
35 import org.onosproject.openflow.controller.OpenFlowEventListener;
36 import org.onosproject.openflow.controller.OpenFlowPacketContext;
37 import org.onosproject.openflow.controller.OpenFlowSwitch;
38 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
39 import org.onosproject.openflow.controller.PacketListener;
40 import org.onosproject.openflow.controller.RoleState;
41 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
42 import org.osgi.service.component.ComponentContext;
43 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsEntry;
44 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsReply;
45 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
46 import org.projectfloodlight.openflow.protocol.OFExperimenter;
47 import org.projectfloodlight.openflow.protocol.OFFactories;
48 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
49 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
50 import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
51 import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
52 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsEntry;
53 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsReply;
54 import org.projectfloodlight.openflow.protocol.OFGroupStatsEntry;
55 import org.projectfloodlight.openflow.protocol.OFGroupStatsReply;
56 import org.projectfloodlight.openflow.protocol.OFMessage;
57 import org.projectfloodlight.openflow.protocol.OFPacketIn;
58 import org.projectfloodlight.openflow.protocol.OFPortDesc;
59 import org.projectfloodlight.openflow.protocol.OFPortStatsEntry;
60 import org.projectfloodlight.openflow.protocol.OFPortStatsReply;
61 import org.projectfloodlight.openflow.protocol.OFPortStatus;
62 import org.projectfloodlight.openflow.protocol.OFStatsReply;
63 import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
64 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
65 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
66 import org.slf4j.Logger;
67 import org.slf4j.LoggerFactory;
68
69 import java.util.Collection;
70 import java.util.Collections;
71 import java.util.LinkedList;
72 import java.util.List;
73 import java.util.Set;
74 import java.util.concurrent.ConcurrentHashMap;
75 import java.util.concurrent.CopyOnWriteArraySet;
76 import java.util.concurrent.ExecutorService;
77 import java.util.concurrent.Executors;
78 import java.util.concurrent.locks.Lock;
79 import java.util.concurrent.locks.ReentrantLock;
80
81 import static org.onlab.util.Tools.groupedThreads;
82
83 @Component(immediate = true)
84 @Service
85 public class OpenFlowControllerImpl implements OpenFlowController {
86     private static final String DEFAULT_OFPORT = "6633,6653";
87     private static final int DEFAULT_WORKER_THREADS = 16;
88
89     private static final Logger log =
90             LoggerFactory.getLogger(OpenFlowControllerImpl.class);
91
92     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93     protected DriverService driverService;
94
95     // References exists merely for sequencing purpose to assure drivers are loaded
96     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97     protected DefaultDriverProviderService defaultDriverProviderService;
98
99     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100     protected ComponentConfigService cfgService;
101
102     @Property(name = "openflowPorts", value = DEFAULT_OFPORT,
103             label = "Port numbers (comma separated) used by OpenFlow protocol; default is 6633,6653")
104     private String openflowPorts = DEFAULT_OFPORT;
105
106     @Property(name = "workerThreads", intValue = DEFAULT_WORKER_THREADS,
107             label = "Number of controller worker threads; default is 16")
108     private int workerThreads = DEFAULT_WORKER_THREADS;
109
110     protected ExecutorService executorMsgs =
111         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
112
113     private final ExecutorService executorBarrier =
114         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
115
116     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
117             new ConcurrentHashMap<>();
118     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
119             new ConcurrentHashMap<>();
120     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
121             new ConcurrentHashMap<>();
122
123     protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
124     protected Set<OpenFlowSwitchListener> ofSwitchListener = new CopyOnWriteArraySet<>();
125
126     protected Multimap<Integer, PacketListener> ofPacketListener =
127             ArrayListMultimap.create();
128
129     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
130
131     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
132             ArrayListMultimap.create();
133
134     protected Multimap<Dpid, OFTableStatsEntry> fullTableStats =
135             ArrayListMultimap.create();
136
137     protected Multimap<Dpid, OFGroupStatsEntry> fullGroupStats =
138             ArrayListMultimap.create();
139
140     protected Multimap<Dpid, OFGroupDescStatsEntry> fullGroupDescStats =
141             ArrayListMultimap.create();
142
143     protected Multimap<Dpid, OFPortStatsEntry> fullPortStats =
144             ArrayListMultimap.create();
145
146     private final Controller ctrl = new Controller();
147
148     @Activate
149     public void activate(ComponentContext context) {
150         cfgService.registerProperties(getClass());
151         ctrl.setConfigParams(context.getProperties());
152         ctrl.start(agent, driverService);
153     }
154
155     @Deactivate
156     public void deactivate() {
157         cfgService.unregisterProperties(getClass(), false);
158         ctrl.stop();
159     }
160
161     @Modified
162     public void modified(ComponentContext context) {
163         ctrl.stop();
164         ctrl.setConfigParams(context.getProperties());
165         ctrl.start(agent, driverService);
166     }
167
168     @Override
169     public Iterable<OpenFlowSwitch> getSwitches() {
170         return connectedSwitches.values();
171     }
172
173     @Override
174     public Iterable<OpenFlowSwitch> getMasterSwitches() {
175         return activeMasterSwitches.values();
176     }
177
178     @Override
179     public Iterable<OpenFlowSwitch> getEqualSwitches() {
180         return activeEqualSwitches.values();
181     }
182
183     @Override
184     public OpenFlowSwitch getSwitch(Dpid dpid) {
185         return connectedSwitches.get(dpid);
186     }
187
188     @Override
189     public OpenFlowSwitch getMasterSwitch(Dpid dpid) {
190         return activeMasterSwitches.get(dpid);
191     }
192
193     @Override
194     public OpenFlowSwitch getEqualSwitch(Dpid dpid) {
195         return activeEqualSwitches.get(dpid);
196     }
197
198     @Override
199     public void addListener(OpenFlowSwitchListener listener) {
200         if (!ofSwitchListener.contains(listener)) {
201             this.ofSwitchListener.add(listener);
202         }
203     }
204
205     @Override
206     public void removeListener(OpenFlowSwitchListener listener) {
207         this.ofSwitchListener.remove(listener);
208     }
209
210     @Override
211     public void addPacketListener(int priority, PacketListener listener) {
212         ofPacketListener.put(priority, listener);
213     }
214
215     @Override
216     public void removePacketListener(PacketListener listener) {
217         ofPacketListener.values().remove(listener);
218     }
219
220     @Override
221     public void addEventListener(OpenFlowEventListener listener) {
222         ofEventListener.add(listener);
223     }
224
225     @Override
226     public void removeEventListener(OpenFlowEventListener listener) {
227         ofEventListener.remove(listener);
228     }
229
230     @Override
231     public void write(Dpid dpid, OFMessage msg) {
232         this.getSwitch(dpid).sendMsg(msg);
233     }
234
235     @Override
236     public void processPacket(Dpid dpid, OFMessage msg) {
237         Collection<OFFlowStatsEntry> flowStats;
238         Collection<OFTableStatsEntry> tableStats;
239         Collection<OFGroupStatsEntry> groupStats;
240         Collection<OFGroupDescStatsEntry> groupDescStats;
241         Collection<OFPortStatsEntry> portStats;
242
243         switch (msg.getType()) {
244         case PORT_STATUS:
245             for (OpenFlowSwitchListener l : ofSwitchListener) {
246                 l.portChanged(dpid, (OFPortStatus) msg);
247             }
248             break;
249         case FEATURES_REPLY:
250             for (OpenFlowSwitchListener l : ofSwitchListener) {
251                 l.switchChanged(dpid);
252             }
253             break;
254         case PACKET_IN:
255             OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
256             .packetContextFromPacketIn(this.getSwitch(dpid),
257                     (OFPacketIn) msg);
258             for (PacketListener p : ofPacketListener.values()) {
259                 p.handlePacket(pktCtx);
260             }
261             break;
262         // TODO: Consider using separate threadpool for sensitive messages.
263         //    ie. Back to back error could cause us to starve.
264         case FLOW_REMOVED:
265         case ERROR:
266             executorMsgs.submit(new OFMessageHandler(dpid, msg));
267             break;
268         case STATS_REPLY:
269             OFStatsReply reply = (OFStatsReply) msg;
270             switch (reply.getStatsType()) {
271                 case PORT_DESC:
272                     for (OpenFlowSwitchListener l : ofSwitchListener) {
273                         l.switchChanged(dpid);
274                     }
275                     break;
276                 case FLOW:
277                     flowStats = publishFlowStats(dpid, (OFFlowStatsReply) reply);
278                     if (flowStats != null) {
279                         OFFlowStatsReply.Builder rep =
280                                 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
281                         rep.setEntries(Lists.newLinkedList(flowStats));
282                         rep.setXid(reply.getXid());
283                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
284                     }
285                     break;
286                 case TABLE:
287                     tableStats = publishTableStats(dpid, (OFTableStatsReply) reply);
288                     if (tableStats != null) {
289                         OFTableStatsReply.Builder rep =
290                                 OFFactories.getFactory(msg.getVersion()).buildTableStatsReply();
291                         rep.setEntries(Lists.newLinkedList(tableStats));
292                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
293                     }
294                     break;
295                 case GROUP:
296                     groupStats = publishGroupStats(dpid, (OFGroupStatsReply) reply);
297                     if (groupStats != null) {
298                         OFGroupStatsReply.Builder rep =
299                                 OFFactories.getFactory(msg.getVersion()).buildGroupStatsReply();
300                         rep.setEntries(Lists.newLinkedList(groupStats));
301                         rep.setXid(reply.getXid());
302                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
303                     }
304                     break;
305                 case GROUP_DESC:
306                     groupDescStats = publishGroupDescStats(dpid,
307                             (OFGroupDescStatsReply) reply);
308                     if (groupDescStats != null) {
309                         OFGroupDescStatsReply.Builder rep =
310                                 OFFactories.getFactory(msg.getVersion()).buildGroupDescStatsReply();
311                         rep.setEntries(Lists.newLinkedList(groupDescStats));
312                         rep.setXid(reply.getXid());
313                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
314                     }
315                     break;
316                 case PORT:
317                     executorMsgs.submit(new OFMessageHandler(dpid, reply));
318                     break;
319                 case METER:
320                     executorMsgs.submit(new OFMessageHandler(dpid, reply));
321                     break;
322                 case EXPERIMENTER:
323                     if (reply instanceof OFCalientFlowStatsReply) {
324                         // Convert Calient flow statistics to regular flow stats
325                         // TODO: parse remaining fields such as power levels etc. when we have proper monitoring API
326                         OFFlowStatsReply.Builder fsr = getSwitch(dpid).factory().buildFlowStatsReply();
327                         List<OFFlowStatsEntry> entries = new LinkedList<>();
328                         for (OFCalientFlowStatsEntry entry : ((OFCalientFlowStatsReply) msg).getEntries()) {
329
330                             // Single instruction, i.e., output to port
331                             OFActionOutput action = OFFactories
332                                     .getFactory(msg.getVersion())
333                                     .actions()
334                                     .buildOutput()
335                                     .setPort(entry.getOutPort())
336                                     .build();
337                             OFInstruction instruction = OFFactories
338                                     .getFactory(msg.getVersion())
339                                     .instructions()
340                                     .applyActions(Collections.singletonList(action));
341                             OFFlowStatsEntry fs = getSwitch(dpid).factory().buildFlowStatsEntry()
342                                     .setMatch(entry.getMatch())
343                                     .setTableId(entry.getTableId())
344                                     .setDurationSec(entry.getDurationSec())
345                                     .setDurationNsec(entry.getDurationNsec())
346                                     .setPriority(entry.getPriority())
347                                     .setIdleTimeout(entry.getIdleTimeout())
348                                     .setHardTimeout(entry.getHardTimeout())
349                                     .setFlags(entry.getFlags())
350                                     .setCookie(entry.getCookie())
351                                     .setInstructions(Collections.singletonList(instruction))
352                                     .build();
353                             entries.add(fs);
354                         }
355                         fsr.setEntries(entries);
356
357                         flowStats = publishFlowStats(dpid, fsr.build());
358                         if (flowStats != null) {
359                             OFFlowStatsReply.Builder rep =
360                                     OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
361                             rep.setEntries(Lists.newLinkedList(flowStats));
362                             executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
363                         }
364                     } else {
365                         executorMsgs.submit(new OFMessageHandler(dpid, reply));
366                     }
367                     break;
368                 default:
369                     log.warn("Discarding unknown stats reply type {}", reply.getStatsType());
370                     break;
371             }
372             break;
373         case BARRIER_REPLY:
374             executorBarrier.submit(new OFMessageHandler(dpid, msg));
375             break;
376         case EXPERIMENTER:
377             long experimenter = ((OFExperimenter) msg).getExperimenter();
378             if (experimenter == 0x748771) {
379                 // LINC-OE port stats
380                 OFCircuitPortStatus circuitPortStatus = (OFCircuitPortStatus) msg;
381                 OFPortStatus.Builder portStatus = this.getSwitch(dpid).factory().buildPortStatus();
382                 OFPortDesc.Builder portDesc = this.getSwitch(dpid).factory().buildPortDesc();
383                 portDesc.setPortNo(circuitPortStatus.getPortNo())
384                         .setHwAddr(circuitPortStatus.getHwAddr())
385                         .setName(circuitPortStatus.getName())
386                         .setConfig(circuitPortStatus.getConfig())
387                         .setState(circuitPortStatus.getState());
388                 portStatus.setReason(circuitPortStatus.getReason()).setDesc(portDesc.build());
389                 for (OpenFlowSwitchListener l : ofSwitchListener) {
390                     l.portChanged(dpid, portStatus.build());
391                 }
392             } else {
393                 log.warn("Handling experimenter type {} not yet implemented",
394                         ((OFExperimenter) msg).getExperimenter(), msg);
395             }
396             break;
397         default:
398             log.warn("Handling message type {} not yet implemented {}",
399                     msg.getType(), msg);
400         }
401     }
402
403     private synchronized Collection<OFFlowStatsEntry> publishFlowStats(Dpid dpid,
404                                                                        OFFlowStatsReply reply) {
405         //TODO: Get rid of synchronized
406         fullFlowStats.putAll(dpid, reply.getEntries());
407         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
408             return fullFlowStats.removeAll(dpid);
409         }
410         return null;
411     }
412
413     private synchronized Collection<OFTableStatsEntry> publishTableStats(Dpid dpid,
414                                                                        OFTableStatsReply reply) {
415         //TODO: Get rid of synchronized
416         fullTableStats.putAll(dpid, reply.getEntries());
417         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
418             return fullTableStats.removeAll(dpid);
419         }
420         return null;
421     }
422
423     private synchronized Collection<OFGroupStatsEntry> publishGroupStats(Dpid dpid,
424                                                                       OFGroupStatsReply reply) {
425         //TODO: Get rid of synchronized
426         fullGroupStats.putAll(dpid, reply.getEntries());
427         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
428             return fullGroupStats.removeAll(dpid);
429         }
430         return null;
431     }
432
433     private synchronized Collection<OFGroupDescStatsEntry> publishGroupDescStats(Dpid dpid,
434                                                                   OFGroupDescStatsReply reply) {
435         //TODO: Get rid of synchronized
436         fullGroupDescStats.putAll(dpid, reply.getEntries());
437         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
438             return fullGroupDescStats.removeAll(dpid);
439         }
440         return null;
441     }
442
443     private synchronized Collection<OFPortStatsEntry> publishPortStats(Dpid dpid,
444                                                                  OFPortStatsReply reply) {
445         fullPortStats.putAll(dpid, reply.getEntries());
446         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
447             return fullPortStats.removeAll(dpid);
448         }
449         return null;
450     }
451
452     @Override
453     public void setRole(Dpid dpid, RoleState role) {
454         final OpenFlowSwitch sw = getSwitch(dpid);
455         if (sw == null) {
456             log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
457             return;
458         }
459         sw.setRole(role);
460     }
461
462     /**
463      * Implementation of an OpenFlow Agent which is responsible for
464      * keeping track of connected switches and the state in which
465      * they are.
466      */
467     public class OpenFlowSwitchAgent implements OpenFlowAgent {
468
469         private final Logger log = LoggerFactory.getLogger(OpenFlowSwitchAgent.class);
470         private final Lock switchLock = new ReentrantLock();
471
472         @Override
473         public boolean addConnectedSwitch(Dpid dpid, OpenFlowSwitch sw) {
474
475             if (connectedSwitches.get(dpid) != null) {
476                 log.error("Trying to add connectedSwitch but found a previous "
477                         + "value for dpid: {}", dpid);
478                 return false;
479             } else {
480                 log.info("Added switch {}", dpid);
481                 connectedSwitches.put(dpid, sw);
482                 for (OpenFlowSwitchListener l : ofSwitchListener) {
483                     l.switchAdded(dpid);
484                 }
485                 return true;
486             }
487         }
488
489         @Override
490         public boolean validActivation(Dpid dpid) {
491             if (connectedSwitches.get(dpid) == null) {
492                 log.error("Trying to activate switch but is not in "
493                         + "connected switches: dpid {}. Aborting ..",
494                         dpid);
495                 return false;
496             }
497             if (activeMasterSwitches.get(dpid) != null ||
498                     activeEqualSwitches.get(dpid) != null) {
499                 log.error("Trying to activate switch but it is already "
500                         + "activated: dpid {}. Found in activeMaster: {} "
501                         + "Found in activeEqual: {}. Aborting ..",
502                           dpid,
503                           (activeMasterSwitches.get(dpid) == null) ? 'N' : 'Y',
504                           (activeEqualSwitches.get(dpid) == null) ? 'N' : 'Y');
505                 return false;
506             }
507             return true;
508         }
509
510
511         @Override
512         public boolean addActivatedMasterSwitch(Dpid dpid, OpenFlowSwitch sw) {
513             switchLock.lock();
514             try {
515                 if (!validActivation(dpid)) {
516                     return false;
517                 }
518                 activeMasterSwitches.put(dpid, sw);
519                 return true;
520             } finally {
521                 switchLock.unlock();
522             }
523         }
524
525         @Override
526         public boolean addActivatedEqualSwitch(Dpid dpid, OpenFlowSwitch sw) {
527             switchLock.lock();
528             try {
529                 if (!validActivation(dpid)) {
530                     return false;
531                 }
532                 activeEqualSwitches.put(dpid, sw);
533                 log.info("Added Activated EQUAL Switch {}", dpid);
534                 return true;
535             } finally {
536                 switchLock.unlock();
537             }
538         }
539
540         @Override
541         public void transitionToMasterSwitch(Dpid dpid) {
542             switchLock.lock();
543             try {
544                 if (activeMasterSwitches.containsKey(dpid)) {
545                     return;
546                 }
547                 OpenFlowSwitch sw = activeEqualSwitches.remove(dpid);
548                 if (sw == null) {
549                     sw = getSwitch(dpid);
550                     if (sw == null) {
551                         log.error("Transition to master called on sw {}, but switch "
552                                 + "was not found in controller-cache", dpid);
553                         return;
554                     }
555                 }
556                 log.info("Transitioned switch {} to MASTER", dpid);
557                 activeMasterSwitches.put(dpid, sw);
558             } finally {
559                 switchLock.unlock();
560             }
561         }
562
563
564         @Override
565         public void transitionToEqualSwitch(Dpid dpid) {
566             switchLock.lock();
567             try {
568                 if (activeEqualSwitches.containsKey(dpid)) {
569                     return;
570                 }
571                 OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
572                 if (sw == null) {
573                     sw = getSwitch(dpid);
574                     if (sw == null) {
575                         log.error("Transition to equal called on sw {}, but switch "
576                                 + "was not found in controller-cache", dpid);
577                         return;
578                     }
579                 }
580                 log.info("Transitioned switch {} to EQUAL", dpid);
581                 activeEqualSwitches.put(dpid, sw);
582             } finally {
583                 switchLock.unlock();
584             }
585
586         }
587
588         @Override
589         public void removeConnectedSwitch(Dpid dpid) {
590             connectedSwitches.remove(dpid);
591             OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
592             if (sw == null) {
593                 log.debug("sw was null for {}", dpid);
594                 sw = activeEqualSwitches.remove(dpid);
595             }
596             for (OpenFlowSwitchListener l : ofSwitchListener) {
597                 l.switchRemoved(dpid);
598             }
599         }
600
601         @Override
602         public void processMessage(Dpid dpid, OFMessage m) {
603             processPacket(dpid, m);
604         }
605
606         @Override
607         public void returnRoleReply(Dpid dpid, RoleState requested, RoleState response) {
608             for (OpenFlowSwitchListener l : ofSwitchListener) {
609                 l.receivedRoleReply(dpid, requested, response);
610             }
611         }
612     }
613
614     protected final class OFMessageHandler implements Runnable {
615
616         protected final OFMessage msg;
617         protected final Dpid dpid;
618
619         public OFMessageHandler(Dpid dpid, OFMessage msg) {
620             this.msg = msg;
621             this.dpid = dpid;
622         }
623
624         @Override
625         public void run() {
626             for (OpenFlowEventListener listener : ofEventListener) {
627                 listener.handleMessage(dpid, msg);
628             }
629         }
630
631     }
632
633 }