c82078a1bad48a643180554575b755842a0a1cae
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.openflow.controller.impl;
17
18 import com.google.common.base.Strings;
19 import com.google.common.collect.ArrayListMultimap;
20 import com.google.common.collect.Lists;
21 import com.google.common.collect.Multimap;
22 import org.apache.felix.scr.annotations.Activate;
23 import org.apache.felix.scr.annotations.Component;
24 import org.apache.felix.scr.annotations.Deactivate;
25 import org.apache.felix.scr.annotations.Modified;
26 import org.apache.felix.scr.annotations.Property;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onosproject.cfg.ComponentConfigService;
31 import org.onosproject.net.driver.DefaultDriverProviderService;
32 import org.onosproject.net.driver.DriverService;
33 import org.onosproject.openflow.controller.DefaultOpenFlowPacketContext;
34 import org.onosproject.openflow.controller.Dpid;
35 import org.onosproject.openflow.controller.OpenFlowController;
36 import org.onosproject.openflow.controller.OpenFlowEventListener;
37 import org.onosproject.openflow.controller.OpenFlowPacketContext;
38 import org.onosproject.openflow.controller.OpenFlowSwitch;
39 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
40 import org.onosproject.openflow.controller.PacketListener;
41 import org.onosproject.openflow.controller.RoleState;
42 import org.onosproject.openflow.controller.driver.OpenFlowAgent;
43 import org.osgi.service.component.ComponentContext;
44 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsEntry;
45 import org.projectfloodlight.openflow.protocol.OFCalientFlowStatsReply;
46 import org.projectfloodlight.openflow.protocol.OFCircuitPortStatus;
47 import org.projectfloodlight.openflow.protocol.OFExperimenter;
48 import org.projectfloodlight.openflow.protocol.OFFactories;
49 import org.projectfloodlight.openflow.protocol.OFFlowStatsEntry;
50 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
51 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsEntry;
52 import org.projectfloodlight.openflow.protocol.OFGroupDescStatsReply;
53 import org.projectfloodlight.openflow.protocol.OFGroupStatsEntry;
54 import org.projectfloodlight.openflow.protocol.OFGroupStatsReply;
55 import org.projectfloodlight.openflow.protocol.OFMessage;
56 import org.projectfloodlight.openflow.protocol.OFPacketIn;
57 import org.projectfloodlight.openflow.protocol.OFPortDesc;
58 import org.projectfloodlight.openflow.protocol.OFPortStatsEntry;
59 import org.projectfloodlight.openflow.protocol.OFPortStatsReply;
60 import org.projectfloodlight.openflow.protocol.OFPortStatus;
61 import org.projectfloodlight.openflow.protocol.OFStatsReply;
62 import org.projectfloodlight.openflow.protocol.OFStatsReplyFlags;
63 import org.projectfloodlight.openflow.protocol.action.OFActionOutput;
64 import org.projectfloodlight.openflow.protocol.instruction.OFInstruction;
65 import org.slf4j.Logger;
66 import org.slf4j.LoggerFactory;
67
68 import java.util.Collection;
69 import java.util.Collections;
70 import java.util.Dictionary;
71 import java.util.HashMap;
72 import java.util.LinkedList;
73 import java.util.List;
74 import java.util.Map;
75 import java.util.Set;
76 import java.util.concurrent.ConcurrentHashMap;
77 import java.util.concurrent.CopyOnWriteArraySet;
78 import java.util.concurrent.ExecutorService;
79 import java.util.concurrent.Executors;
80 import java.util.concurrent.locks.Lock;
81 import java.util.concurrent.locks.ReentrantLock;
82
83 import static org.onlab.util.Tools.get;
84 import static org.onlab.util.Tools.groupedThreads;
85
86 @Component(immediate = true)
87 @Service
88 public class OpenFlowControllerImpl implements OpenFlowController {
89     private static final int DEFAULT_OFPORT = 6633;
90     private static final int DEFAULT_WORKER_THREADS = 16;
91
92     private static final Logger log =
93             LoggerFactory.getLogger(OpenFlowControllerImpl.class);
94
95     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96     protected DriverService driverService;
97
98     // References exists merely for sequencing purpose to assure drivers are loaded
99     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100     protected DefaultDriverProviderService defaultDriverProviderService;
101
102     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103     protected ComponentConfigService cfgService;
104
105     @Property(name = "openflowPort", intValue = DEFAULT_OFPORT,
106             label = "Port number used by OpenFlow protocol; default is 6633")
107     private int openflowPort = DEFAULT_OFPORT;
108
109     @Property(name = "workerThreads", intValue = DEFAULT_WORKER_THREADS,
110             label = "Number of controller worker threads; default is 16")
111     private int workerThreads = DEFAULT_WORKER_THREADS;
112
113     private final ExecutorService executorMsgs =
114         Executors.newFixedThreadPool(32, groupedThreads("onos/of", "event-stats-%d"));
115
116     private final ExecutorService executorBarrier =
117         Executors.newFixedThreadPool(4, groupedThreads("onos/of", "event-barrier-%d"));
118
119     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> connectedSwitches =
120             new ConcurrentHashMap<>();
121     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeMasterSwitches =
122             new ConcurrentHashMap<>();
123     protected ConcurrentHashMap<Dpid, OpenFlowSwitch> activeEqualSwitches =
124             new ConcurrentHashMap<>();
125
126     protected OpenFlowSwitchAgent agent = new OpenFlowSwitchAgent();
127     protected Set<OpenFlowSwitchListener> ofSwitchListener = new CopyOnWriteArraySet<>();
128
129     protected Multimap<Integer, PacketListener> ofPacketListener =
130             ArrayListMultimap.create();
131
132     protected Set<OpenFlowEventListener> ofEventListener = new CopyOnWriteArraySet<>();
133
134     protected Multimap<Dpid, OFFlowStatsEntry> fullFlowStats =
135             ArrayListMultimap.create();
136
137     protected Multimap<Dpid, OFGroupStatsEntry> fullGroupStats =
138             ArrayListMultimap.create();
139
140     protected Multimap<Dpid, OFGroupDescStatsEntry> fullGroupDescStats =
141             ArrayListMultimap.create();
142
143     protected Multimap<Dpid, OFPortStatsEntry> fullPortStats =
144             ArrayListMultimap.create();
145
146     private final Controller ctrl = new Controller();
147
148     @Activate
149     public void activate(ComponentContext context) {
150         cfgService.registerProperties(getClass());
151         Map<String, String> properties = readComponentConfiguration(context);
152         ctrl.setConfigParams(properties);
153         ctrl.start(agent, driverService);
154     }
155
156     @Deactivate
157     public void deactivate() {
158         cfgService.unregisterProperties(getClass(), false);
159         ctrl.stop();
160     }
161
162     /**
163      * Extracts properties from the component configuration context.
164      *
165      * @param context the component context
166      */
167     private Map<String, String> readComponentConfiguration(ComponentContext context) {
168         Dictionary<?, ?> properties = context.getProperties();
169         Map<String, String> outProperties = new HashMap<>();
170
171         String port = get(properties, "openflowPort");
172         if (!Strings.isNullOrEmpty(port)) {
173             outProperties.put("openflowport", port);
174         }
175
176         String thread = get(properties, "workerThreads");
177         if (!Strings.isNullOrEmpty(thread)) {
178             outProperties.put("workerthreads", thread);
179         }
180
181         return outProperties;
182     }
183
184     @Modified
185     public void modified(ComponentContext context) {
186         Map<String, String> properties = readComponentConfiguration(context);
187         ctrl.stop();
188         ctrl.setConfigParams(properties);
189         ctrl.start(agent, driverService);
190     }
191
192     @Override
193     public Iterable<OpenFlowSwitch> getSwitches() {
194         return connectedSwitches.values();
195     }
196
197     @Override
198     public Iterable<OpenFlowSwitch> getMasterSwitches() {
199         return activeMasterSwitches.values();
200     }
201
202     @Override
203     public Iterable<OpenFlowSwitch> getEqualSwitches() {
204         return activeEqualSwitches.values();
205     }
206
207     @Override
208     public OpenFlowSwitch getSwitch(Dpid dpid) {
209         return connectedSwitches.get(dpid);
210     }
211
212     @Override
213     public OpenFlowSwitch getMasterSwitch(Dpid dpid) {
214         return activeMasterSwitches.get(dpid);
215     }
216
217     @Override
218     public OpenFlowSwitch getEqualSwitch(Dpid dpid) {
219         return activeEqualSwitches.get(dpid);
220     }
221
222     @Override
223     public void addListener(OpenFlowSwitchListener listener) {
224         if (!ofSwitchListener.contains(listener)) {
225             this.ofSwitchListener.add(listener);
226         }
227     }
228
229     @Override
230     public void removeListener(OpenFlowSwitchListener listener) {
231         this.ofSwitchListener.remove(listener);
232     }
233
234     @Override
235     public void addPacketListener(int priority, PacketListener listener) {
236         ofPacketListener.put(priority, listener);
237     }
238
239     @Override
240     public void removePacketListener(PacketListener listener) {
241         ofPacketListener.values().remove(listener);
242     }
243
244     @Override
245     public void addEventListener(OpenFlowEventListener listener) {
246         ofEventListener.add(listener);
247     }
248
249     @Override
250     public void removeEventListener(OpenFlowEventListener listener) {
251         ofEventListener.remove(listener);
252     }
253
254     @Override
255     public void write(Dpid dpid, OFMessage msg) {
256         this.getSwitch(dpid).sendMsg(msg);
257     }
258
259     @Override
260     public void processPacket(Dpid dpid, OFMessage msg) {
261         Collection<OFFlowStatsEntry> flowStats;
262         Collection<OFGroupStatsEntry> groupStats;
263         Collection<OFGroupDescStatsEntry> groupDescStats;
264         Collection<OFPortStatsEntry> portStats;
265
266         switch (msg.getType()) {
267         case PORT_STATUS:
268             for (OpenFlowSwitchListener l : ofSwitchListener) {
269                 l.portChanged(dpid, (OFPortStatus) msg);
270             }
271             break;
272         case FEATURES_REPLY:
273             for (OpenFlowSwitchListener l : ofSwitchListener) {
274                 l.switchChanged(dpid);
275             }
276             break;
277         case PACKET_IN:
278             OpenFlowPacketContext pktCtx = DefaultOpenFlowPacketContext
279             .packetContextFromPacketIn(this.getSwitch(dpid),
280                     (OFPacketIn) msg);
281             for (PacketListener p : ofPacketListener.values()) {
282                 p.handlePacket(pktCtx);
283             }
284             break;
285         // TODO: Consider using separate threadpool for sensitive messages.
286         //    ie. Back to back error could cause us to starve.
287         case FLOW_REMOVED:
288         case ERROR:
289             executorMsgs.submit(new OFMessageHandler(dpid, msg));
290             break;
291         case STATS_REPLY:
292             OFStatsReply reply = (OFStatsReply) msg;
293             switch (reply.getStatsType()) {
294                 case PORT_DESC:
295                     for (OpenFlowSwitchListener l : ofSwitchListener) {
296                         l.switchChanged(dpid);
297                     }
298                     break;
299                 case FLOW:
300                     flowStats = publishFlowStats(dpid, (OFFlowStatsReply) reply);
301                     if (flowStats != null) {
302                         OFFlowStatsReply.Builder rep =
303                                 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
304                         rep.setEntries(Lists.newLinkedList(flowStats));
305                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
306                     }
307                     break;
308                 case GROUP:
309                     groupStats = publishGroupStats(dpid, (OFGroupStatsReply) reply);
310                     if (groupStats != null) {
311                         OFGroupStatsReply.Builder rep =
312                                 OFFactories.getFactory(msg.getVersion()).buildGroupStatsReply();
313                         rep.setEntries(Lists.newLinkedList(groupStats));
314                         rep.setXid(reply.getXid());
315                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
316                     }
317                     break;
318                 case GROUP_DESC:
319                     groupDescStats = publishGroupDescStats(dpid,
320                             (OFGroupDescStatsReply) reply);
321                     if (groupDescStats != null) {
322                         OFGroupDescStatsReply.Builder rep =
323                                 OFFactories.getFactory(msg.getVersion()).buildGroupDescStatsReply();
324                         rep.setEntries(Lists.newLinkedList(groupDescStats));
325                         rep.setXid(reply.getXid());
326                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
327                     }
328                     break;
329                 case PORT:
330                     executorMsgs.submit(new OFMessageHandler(dpid, reply));
331                     break;
332                 case METER:
333                     executorMsgs.submit(new OFMessageHandler(dpid, reply));
334                     break;
335                 case EXPERIMENTER:
336                     if (reply instanceof OFCalientFlowStatsReply) {
337                         // Convert Calient flow statistics to regular flow stats
338                         // TODO: parse remaining fields such as power levels etc. when we have proper monitoring API
339                         OFFlowStatsReply.Builder fsr = getSwitch(dpid).factory().buildFlowStatsReply();
340                         List<OFFlowStatsEntry> entries = new LinkedList<>();
341                         for (OFCalientFlowStatsEntry entry : ((OFCalientFlowStatsReply) msg).getEntries()) {
342
343                             // Single instruction, i.e., output to port
344                             OFActionOutput action = OFFactories
345                                     .getFactory(msg.getVersion())
346                                     .actions()
347                                     .buildOutput()
348                                     .setPort(entry.getOutPort())
349                                     .build();
350                             OFInstruction instruction = OFFactories
351                                     .getFactory(msg.getVersion())
352                                     .instructions()
353                                     .applyActions(Collections.singletonList(action));
354                             OFFlowStatsEntry fs = getSwitch(dpid).factory().buildFlowStatsEntry()
355                                     .setMatch(entry.getMatch())
356                                     .setTableId(entry.getTableId())
357                                     .setDurationSec(entry.getDurationSec())
358                                     .setDurationNsec(entry.getDurationNsec())
359                                     .setPriority(entry.getPriority())
360                                     .setIdleTimeout(entry.getIdleTimeout())
361                                     .setHardTimeout(entry.getHardTimeout())
362                                     .setFlags(entry.getFlags())
363                                     .setCookie(entry.getCookie())
364                                     .setInstructions(Collections.singletonList(instruction))
365                                     .build();
366                             entries.add(fs);
367                         }
368                         fsr.setEntries(entries);
369
370                         flowStats = publishFlowStats(dpid, fsr.build());
371                         if (flowStats != null) {
372                             OFFlowStatsReply.Builder rep =
373                                     OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
374                             rep.setEntries(Lists.newLinkedList(flowStats));
375                             executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
376                         }
377                     } else {
378                         executorMsgs.submit(new OFMessageHandler(dpid, reply));
379                     }
380                     break;
381                 default:
382                     log.warn("Discarding unknown stats reply type {}", reply.getStatsType());
383                     break;
384             }
385             break;
386         case BARRIER_REPLY:
387             executorBarrier.submit(new OFMessageHandler(dpid, msg));
388             break;
389         case EXPERIMENTER:
390             long experimenter = ((OFExperimenter) msg).getExperimenter();
391             if (experimenter == 0x748771) {
392                 // LINC-OE port stats
393                 OFCircuitPortStatus circuitPortStatus = (OFCircuitPortStatus) msg;
394                 OFPortStatus.Builder portStatus = this.getSwitch(dpid).factory().buildPortStatus();
395                 OFPortDesc.Builder portDesc = this.getSwitch(dpid).factory().buildPortDesc();
396                 portDesc.setPortNo(circuitPortStatus.getPortNo())
397                         .setHwAddr(circuitPortStatus.getHwAddr())
398                         .setName(circuitPortStatus.getName())
399                         .setConfig(circuitPortStatus.getConfig())
400                         .setState(circuitPortStatus.getState());
401                 portStatus.setReason(circuitPortStatus.getReason()).setDesc(portDesc.build());
402                 for (OpenFlowSwitchListener l : ofSwitchListener) {
403                     l.portChanged(dpid, portStatus.build());
404                 }
405             } else {
406                 log.warn("Handling experimenter type {} not yet implemented",
407                         ((OFExperimenter) msg).getExperimenter(), msg);
408             }
409             break;
410         default:
411             log.warn("Handling message type {} not yet implemented {}",
412                     msg.getType(), msg);
413         }
414     }
415
416     private synchronized Collection<OFFlowStatsEntry> publishFlowStats(Dpid dpid,
417                                                                        OFFlowStatsReply reply) {
418         //TODO: Get rid of synchronized
419         fullFlowStats.putAll(dpid, reply.getEntries());
420         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
421             return fullFlowStats.removeAll(dpid);
422         }
423         return null;
424     }
425
426     private synchronized Collection<OFGroupStatsEntry> publishGroupStats(Dpid dpid,
427                                                                       OFGroupStatsReply reply) {
428         //TODO: Get rid of synchronized
429         fullGroupStats.putAll(dpid, reply.getEntries());
430         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
431             return fullGroupStats.removeAll(dpid);
432         }
433         return null;
434     }
435
436     private synchronized Collection<OFGroupDescStatsEntry> publishGroupDescStats(Dpid dpid,
437                                                                   OFGroupDescStatsReply reply) {
438         //TODO: Get rid of synchronized
439         fullGroupDescStats.putAll(dpid, reply.getEntries());
440         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
441             return fullGroupDescStats.removeAll(dpid);
442         }
443         return null;
444     }
445
446     private synchronized Collection<OFPortStatsEntry> publishPortStats(Dpid dpid,
447                                                                  OFPortStatsReply reply) {
448         fullPortStats.putAll(dpid, reply.getEntries());
449         if (!reply.getFlags().contains(OFStatsReplyFlags.REPLY_MORE)) {
450             return fullPortStats.removeAll(dpid);
451         }
452         return null;
453     }
454
455     @Override
456     public void setRole(Dpid dpid, RoleState role) {
457         final OpenFlowSwitch sw = getSwitch(dpid);
458         if (sw == null) {
459             log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
460             return;
461         }
462         sw.setRole(role);
463     }
464
465     /**
466      * Implementation of an OpenFlow Agent which is responsible for
467      * keeping track of connected switches and the state in which
468      * they are.
469      */
470     public class OpenFlowSwitchAgent implements OpenFlowAgent {
471
472         private final Logger log = LoggerFactory.getLogger(OpenFlowSwitchAgent.class);
473         private final Lock switchLock = new ReentrantLock();
474
475         @Override
476         public boolean addConnectedSwitch(Dpid dpid, OpenFlowSwitch sw) {
477
478             if (connectedSwitches.get(dpid) != null) {
479                 log.error("Trying to add connectedSwitch but found a previous "
480                         + "value for dpid: {}", dpid);
481                 return false;
482             } else {
483                 log.info("Added switch {}", dpid);
484                 connectedSwitches.put(dpid, sw);
485                 for (OpenFlowSwitchListener l : ofSwitchListener) {
486                     l.switchAdded(dpid);
487                 }
488                 return true;
489             }
490         }
491
492         @Override
493         public boolean validActivation(Dpid dpid) {
494             if (connectedSwitches.get(dpid) == null) {
495                 log.error("Trying to activate switch but is not in "
496                         + "connected switches: dpid {}. Aborting ..",
497                         dpid);
498                 return false;
499             }
500             if (activeMasterSwitches.get(dpid) != null ||
501                     activeEqualSwitches.get(dpid) != null) {
502                 log.error("Trying to activate switch but it is already "
503                         + "activated: dpid {}. Found in activeMaster: {} "
504                         + "Found in activeEqual: {}. Aborting ..", new Object[]{
505                                 dpid,
506                                 (activeMasterSwitches.get(dpid) == null) ? 'N' : 'Y',
507                                         (activeEqualSwitches.get(dpid) == null) ? 'N' : 'Y'});
508                 return false;
509             }
510             return true;
511         }
512
513
514         @Override
515         public boolean addActivatedMasterSwitch(Dpid dpid, OpenFlowSwitch sw) {
516             switchLock.lock();
517             try {
518                 if (!validActivation(dpid)) {
519                     return false;
520                 }
521                 activeMasterSwitches.put(dpid, sw);
522                 return true;
523             } finally {
524                 switchLock.unlock();
525             }
526         }
527
528         @Override
529         public boolean addActivatedEqualSwitch(Dpid dpid, OpenFlowSwitch sw) {
530             switchLock.lock();
531             try {
532                 if (!validActivation(dpid)) {
533                     return false;
534                 }
535                 activeEqualSwitches.put(dpid, sw);
536                 log.info("Added Activated EQUAL Switch {}", dpid);
537                 return true;
538             } finally {
539                 switchLock.unlock();
540             }
541         }
542
543         @Override
544         public void transitionToMasterSwitch(Dpid dpid) {
545             switchLock.lock();
546             try {
547                 if (activeMasterSwitches.containsKey(dpid)) {
548                     return;
549                 }
550                 OpenFlowSwitch sw = activeEqualSwitches.remove(dpid);
551                 if (sw == null) {
552                     sw = getSwitch(dpid);
553                     if (sw == null) {
554                         log.error("Transition to master called on sw {}, but switch "
555                                 + "was not found in controller-cache", dpid);
556                         return;
557                     }
558                 }
559                 log.info("Transitioned switch {} to MASTER", dpid);
560                 activeMasterSwitches.put(dpid, sw);
561             } finally {
562                 switchLock.unlock();
563             }
564         }
565
566
567         @Override
568         public void transitionToEqualSwitch(Dpid dpid) {
569             switchLock.lock();
570             try {
571                 if (activeEqualSwitches.containsKey(dpid)) {
572                     return;
573                 }
574                 OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
575                 if (sw == null) {
576                     sw = getSwitch(dpid);
577                     if (sw == null) {
578                         log.error("Transition to equal called on sw {}, but switch "
579                                 + "was not found in controller-cache", dpid);
580                         return;
581                     }
582                 }
583                 log.info("Transitioned switch {} to EQUAL", dpid);
584                 activeEqualSwitches.put(dpid, sw);
585             } finally {
586                 switchLock.unlock();
587             }
588
589         }
590
591         @Override
592         public void removeConnectedSwitch(Dpid dpid) {
593             connectedSwitches.remove(dpid);
594             OpenFlowSwitch sw = activeMasterSwitches.remove(dpid);
595             if (sw == null) {
596                 log.debug("sw was null for {}", dpid);
597                 sw = activeEqualSwitches.remove(dpid);
598             }
599             for (OpenFlowSwitchListener l : ofSwitchListener) {
600                 l.switchRemoved(dpid);
601             }
602         }
603
604         @Override
605         public void processMessage(Dpid dpid, OFMessage m) {
606             processPacket(dpid, m);
607         }
608
609         @Override
610         public void returnRoleReply(Dpid dpid, RoleState requested, RoleState response) {
611             for (OpenFlowSwitchListener l : ofSwitchListener) {
612                 l.receivedRoleReply(dpid, requested, response);
613             }
614         }
615     }
616
617     private final class OFMessageHandler implements Runnable {
618
619         private final OFMessage msg;
620         private final Dpid dpid;
621
622         public OFMessageHandler(Dpid dpid, OFMessage msg) {
623             this.msg = msg;
624             this.dpid = dpid;
625         }
626
627         @Override
628         public void run() {
629             for (OpenFlowEventListener listener : ofEventListener) {
630                 listener.handleMessage(dpid, msg);
631             }
632         }
633
634     }
635
636 }