05663129d40be0c8373821f6d93c4c4244944a14
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.segmentrouting;
17
18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.packet.Ethernet;
25 import org.onlab.packet.IPv4;
26 import org.onlab.util.KryoNamespace;
27 import org.onosproject.core.ApplicationId;
28 import org.onosproject.core.CoreService;
29 import org.onosproject.event.Event;
30 import org.onosproject.net.config.ConfigFactory;
31 import org.onosproject.net.config.NetworkConfigEvent;
32 import org.onosproject.net.config.NetworkConfigRegistry;
33 import org.onosproject.net.config.NetworkConfigListener;
34 import org.onosproject.net.config.basics.SubjectFactories;
35 import org.onosproject.segmentrouting.config.SegmentRoutingConfig;
36 import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
37 import org.onosproject.segmentrouting.grouphandler.NeighborSet;
38 import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
39 import org.onosproject.mastership.MastershipService;
40 import org.onosproject.net.Device;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.Link;
43 import org.onosproject.net.Port;
44 import org.onosproject.net.device.DeviceEvent;
45 import org.onosproject.net.device.DeviceListener;
46 import org.onosproject.net.device.DeviceService;
47 import org.onosproject.net.flowobjective.FlowObjectiveService;
48 import org.onosproject.net.group.GroupKey;
49 import org.onosproject.net.host.HostService;
50 import org.onosproject.net.intent.IntentService;
51 import org.onosproject.net.link.LinkEvent;
52 import org.onosproject.net.link.LinkListener;
53 import org.onosproject.net.link.LinkService;
54 import org.onosproject.net.packet.InboundPacket;
55 import org.onosproject.net.packet.PacketContext;
56 import org.onosproject.net.packet.PacketProcessor;
57 import org.onosproject.net.packet.PacketService;
58 import org.onosproject.net.topology.TopologyService;
59 import org.onosproject.store.service.EventuallyConsistentMap;
60 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
61 import org.onosproject.store.service.StorageService;
62 import org.onosproject.store.service.WallClockTimestamp;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import java.net.URI;
67 import java.util.HashSet;
68 import java.util.List;
69 import java.util.Map;
70 import java.util.concurrent.ConcurrentHashMap;
71 import java.util.concurrent.ConcurrentLinkedQueue;
72 import java.util.concurrent.Executors;
73 import java.util.concurrent.ScheduledExecutorService;
74 import java.util.concurrent.ScheduledFuture;
75 import java.util.concurrent.TimeUnit;
76
77 @SuppressWarnings("ALL")
78 @Service
79 @Component(immediate = true)
80 public class SegmentRoutingManager implements SegmentRoutingService {
81
82     private static Logger log = LoggerFactory
83             .getLogger(SegmentRoutingManager.class);
84
85     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86     protected CoreService coreService;
87
88     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89     protected TopologyService topologyService;
90
91     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92     protected PacketService packetService;
93
94     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95     protected IntentService intentService;
96
97     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98     protected HostService hostService;
99
100     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101     protected DeviceService deviceService;
102
103     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104     protected FlowObjectiveService flowObjectiveService;
105
106     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107     protected LinkService linkService;
108
109     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110     protected MastershipService mastershipService;
111
112     protected ArpHandler arpHandler = null;
113     protected IcmpHandler icmpHandler = null;
114     protected IpHandler ipHandler = null;
115     protected RoutingRulePopulator routingRulePopulator = null;
116     protected ApplicationId appId;
117     protected DeviceConfiguration deviceConfiguration = null;
118
119
120     private DefaultRoutingHandler defaultRoutingHandler = null;
121     private TunnelHandler tunnelHandler = null;
122     private PolicyHandler policyHandler = null;
123     private InternalPacketProcessor processor = new InternalPacketProcessor();
124     private InternalEventHandler eventHandler = new InternalEventHandler();
125
126     private ScheduledExecutorService executorService = Executors
127             .newScheduledThreadPool(1);
128
129     private static ScheduledFuture<?> eventHandlerFuture = null;
130     private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
131     private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
132     // Per device next objective ID store with (device id + neighbor set) as key
133     private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
134         Integer> nsNextObjStore = null;
135     private EventuallyConsistentMap<String, Tunnel> tunnelStore = null;
136     private EventuallyConsistentMap<String, Policy> policyStore = null;
137
138     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
139     protected StorageService storageService;
140
141     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
142     protected NetworkConfigRegistry cfgService;
143
144     private final InternalConfigListener cfgListener =
145             new InternalConfigListener(this);
146
147     private final ConfigFactory cfgFactory =
148             new ConfigFactory(SubjectFactories.DEVICE_SUBJECT_FACTORY,
149                               SegmentRoutingConfig.class,
150                               "segmentrouting") {
151                 @Override
152                 public SegmentRoutingConfig createConfig() {
153                     return new SegmentRoutingConfig();
154                 }
155             };
156
157     private Object threadSchedulerLock = new Object();
158     private static int numOfEventsQueued = 0;
159     private static int numOfEventsExecuted = 0;
160     private static int numOfHandlerExecution = 0;
161     private static int numOfHandlerScheduled = 0;
162
163     private KryoNamespace.Builder kryoBuilder = null;
164
165     @Activate
166     protected void activate() {
167         appId = coreService
168                 .registerApplication("org.onosproject.segmentrouting");
169
170         kryoBuilder = new KryoNamespace.Builder()
171             .register(NeighborSetNextObjectiveStoreKey.class,
172                     NeighborSet.class,
173                     DeviceId.class,
174                     URI.class,
175                     WallClockTimestamp.class,
176                     org.onosproject.cluster.NodeId.class,
177                     HashSet.class,
178                     Tunnel.class,
179                     DefaultTunnel.class,
180                     Policy.class,
181                     TunnelPolicy.class,
182                     Policy.Type.class
183             );
184
185         log.debug("Creating EC map nsnextobjectivestore");
186         EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
187                 nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
188
189         nsNextObjStore = nsNextObjMapBuilder
190                 .withName("nsnextobjectivestore")
191                 .withSerializer(kryoBuilder)
192                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
193                 .build();
194         log.trace("Current size {}", nsNextObjStore.size());
195
196         EventuallyConsistentMapBuilder<String, Tunnel> tunnelMapBuilder =
197                 storageService.eventuallyConsistentMapBuilder();
198
199         tunnelStore = tunnelMapBuilder
200                 .withName("tunnelstore")
201                 .withSerializer(kryoBuilder)
202                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
203                 .build();
204
205         EventuallyConsistentMapBuilder<String, Policy> policyMapBuilder =
206                 storageService.eventuallyConsistentMapBuilder();
207
208         policyStore = policyMapBuilder
209                 .withName("policystore")
210                 .withSerializer(kryoBuilder)
211                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
212                 .build();
213
214         cfgService.addListener(cfgListener);
215         cfgService.registerConfigFactory(cfgFactory);
216
217         log.info("Started");
218     }
219
220     @Deactivate
221     protected void deactivate() {
222         cfgService.removeListener(cfgListener);
223         cfgService.unregisterConfigFactory(cfgFactory);
224
225         packetService.removeProcessor(processor);
226         processor = null;
227         log.info("Stopped");
228     }
229
230
231     @Override
232     public List<Tunnel> getTunnels() {
233         return tunnelHandler.getTunnels();
234     }
235
236     @Override
237     public TunnelHandler.Result createTunnel(Tunnel tunnel) {
238         return tunnelHandler.createTunnel(tunnel);
239     }
240
241     @Override
242     public TunnelHandler.Result removeTunnel(Tunnel tunnel) {
243         for (Policy policy: policyHandler.getPolicies()) {
244             if (policy.type() == Policy.Type.TUNNEL_FLOW) {
245                 TunnelPolicy tunnelPolicy = (TunnelPolicy) policy;
246                 if (tunnelPolicy.tunnelId().equals(tunnel.id())) {
247                     log.warn("Cannot remove the tunnel used by a policy");
248                     return TunnelHandler.Result.TUNNEL_IN_USE;
249                 }
250             }
251         }
252         return tunnelHandler.removeTunnel(tunnel);
253     }
254
255     @Override
256     public PolicyHandler.Result removePolicy(Policy policy) {
257         return policyHandler.removePolicy(policy);
258     }
259
260     @Override
261     public PolicyHandler.Result createPolicy(Policy policy) {
262         return policyHandler.createPolicy(policy);
263     }
264
265     @Override
266     public List<Policy> getPolicies() {
267         return policyHandler.getPolicies();
268     }
269
270     /**
271      * Returns the tunnel object with the tunnel ID.
272      *
273      * @param tunnelId Tunnel ID
274      * @return Tunnel reference
275      */
276     public Tunnel getTunnel(String tunnelId) {
277         return tunnelHandler.getTunnel(tunnelId);
278     }
279
280     /**
281      * Returns the GrouopKey object for the device and the NighborSet given.
282      *
283      * @param ns NeightborSet object for the GroupKey
284      * @return GroupKey object for the NeighborSet
285      */
286     public GroupKey getGroupKey(NeighborSet ns) {
287
288         for (DefaultGroupHandler groupHandler : groupHandlerMap.values()) {
289             return groupHandler.getGroupKey(ns);
290         }
291
292         return null;
293     }
294
295     /**
296      * Returns the next objective ID for the NeighborSet given. If the nextObjectiveID does not exist,
297      * a new one is created and returned.
298      *
299      * @param deviceId Device ID
300      * @param ns NegighborSet
301      * @return next objective ID
302      */
303     public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
304
305         if (groupHandlerMap.get(deviceId) != null) {
306             log.trace("getNextObjectiveId query in device {}", deviceId);
307             return groupHandlerMap
308                     .get(deviceId).getNextObjectiveId(ns);
309         } else {
310             log.warn("getNextObjectiveId query in device {} not found", deviceId);
311             return -1;
312         }
313     }
314
315     private class InternalPacketProcessor implements PacketProcessor {
316
317         @Override
318         public void process(PacketContext context) {
319
320             if (context.isHandled()) {
321                 return;
322             }
323
324             InboundPacket pkt = context.inPacket();
325             Ethernet ethernet = pkt.parsed();
326
327             if (ethernet.getEtherType() == Ethernet.TYPE_ARP) {
328                 arpHandler.processPacketIn(pkt);
329             } else if (ethernet.getEtherType() == Ethernet.TYPE_IPV4) {
330                 IPv4 ipPacket = (IPv4) ethernet.getPayload();
331                 ipHandler.addToPacketBuffer(ipPacket);
332                 if (ipPacket.getProtocol() == IPv4.PROTOCOL_ICMP) {
333                     icmpHandler.processPacketIn(pkt);
334                 } else {
335                     ipHandler.processPacketIn(pkt);
336                 }
337             }
338         }
339     }
340
341     private class InternalLinkListener implements LinkListener {
342         @Override
343         public void event(LinkEvent event) {
344             if (event.type() == LinkEvent.Type.LINK_ADDED
345                     || event.type() == LinkEvent.Type.LINK_REMOVED) {
346                 log.debug("Event {} received from Link Service", event.type());
347                 scheduleEventHandlerIfNotScheduled(event);
348             }
349         }
350     }
351
352     private class InternalDeviceListener implements DeviceListener {
353
354         @Override
355         public void event(DeviceEvent event) {
356             /*if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
357                 log.debug("Local role {} is not MASTER for device {}",
358                           mastershipService.getLocalRole(event.subject().id()),
359                           event.subject().id());
360                 return;
361             }*/
362
363             switch (event.type()) {
364             case DEVICE_ADDED:
365             case PORT_REMOVED:
366             case DEVICE_UPDATED:
367             case DEVICE_AVAILABILITY_CHANGED:
368                 log.debug("Event {} received from Device Service", event.type());
369                 scheduleEventHandlerIfNotScheduled(event);
370                 break;
371             default:
372             }
373         }
374     }
375
376     private void scheduleEventHandlerIfNotScheduled(Event event) {
377
378         synchronized (threadSchedulerLock) {
379             eventQueue.add(event);
380             numOfEventsQueued++;
381
382             if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
383                 //No pending scheduled event handling threads. So start a new one.
384                 eventHandlerFuture = executorService
385                         .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
386                 numOfHandlerScheduled++;
387             }
388             log.trace("numOfEventsQueued {}, numOfEventHanlderScheduled {}",
389                       numOfEventsQueued,
390                       numOfHandlerScheduled);
391         }
392     }
393
394     private class InternalEventHandler implements Runnable {
395
396         @Override
397         public void run() {
398             try {
399                 while (true) {
400                     Event event = null;
401                     synchronized (threadSchedulerLock) {
402                         if (!eventQueue.isEmpty()) {
403                             event = eventQueue.poll();
404                             numOfEventsExecuted++;
405                         } else {
406                             numOfHandlerExecution++;
407                             log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
408                                       numOfHandlerExecution, numOfEventsExecuted);
409                             break;
410                         }
411                     }
412                     if (event.type() == LinkEvent.Type.LINK_ADDED) {
413                         processLinkAdded((Link) event.subject());
414                     } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
415                         processLinkRemoved((Link) event.subject());
416                     //} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
417                     //    processGroupAdded((Group) event.subject());
418                     } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
419                             event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
420                             event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
421                         if (deviceService.isAvailable(((Device) event.subject()).id())) {
422                             processDeviceAdded((Device) event.subject());
423                         }
424                     } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
425                         processPortRemoved((Device) event.subject(),
426                                            ((DeviceEvent) event).port());
427                     } else {
428                         log.warn("Unhandled event type: {}", event.type());
429                     }
430                 }
431             } catch (Exception e) {
432                 log.error("SegmentRouting event handler "
433                         + "thread thrown an exception: {}", e);
434             }
435         }
436     }
437
438     private void processLinkAdded(Link link) {
439         log.debug("A new link {} was added", link.toString());
440
441         //Irrespective whether the local is a MASTER or not for this device,
442         //create group handler instance and push default TTP flow rules.
443         //Because in a multi-instance setup, instances can initiate
444         //groups for any devices. Also the default TTP rules are needed
445         //to be pushed before inserting any IP table entries for any device
446         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
447                 .deviceId());
448         if (groupHandler != null) {
449             groupHandler.linkUp(link);
450         } else {
451             Device device = deviceService.getDevice(link.src().deviceId());
452             if (device != null) {
453                 log.warn("processLinkAdded: Link Added "
454                         + "Notification without Device Added "
455                         + "event, still handling it");
456                 processDeviceAdded(device);
457                 groupHandler = groupHandlerMap.get(link.src()
458                                                    .deviceId());
459                 groupHandler.linkUp(link);
460             }
461         }
462
463         log.trace("Starting optimized route population process");
464         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
465         //log.trace("processLinkAdded: re-starting route population process");
466         //defaultRoutingHandler.startPopulationProcess();
467     }
468
469     private void processLinkRemoved(Link link) {
470         log.debug("A link {} was removed", link.toString());
471         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src().deviceId());
472         if (groupHandler != null) {
473             groupHandler.portDown(link.src().port());
474         }
475         log.trace("Starting optimized route population process");
476         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
477         //log.trace("processLinkRemoved: re-starting route population process");
478         //defaultRoutingHandler.startPopulationProcess();
479     }
480
481     private void processDeviceAdded(Device device) {
482         log.debug("A new device with ID {} was added", device.id());
483         //Irrespective whether the local is a MASTER or not for this device,
484         //create group handler instance and push default TTP flow rules.
485         //Because in a multi-instance setup, instances can initiate
486         //groups for any devices. Also the default TTP rules are needed
487         //to be pushed before inserting any IP table entries for any device
488         DefaultGroupHandler dgh = DefaultGroupHandler.
489                 createGroupHandler(device.id(),
490                                    appId,
491                                    deviceConfiguration,
492                                    linkService,
493                                    flowObjectiveService,
494                                    nsNextObjStore);
495         groupHandlerMap.put(device.id(), dgh);
496         defaultRoutingHandler.populateTtpRules(device.id());
497     }
498
499     private void processPortRemoved(Device device, Port port) {
500         log.debug("Port {} was removed", port.toString());
501         DefaultGroupHandler groupHandler = groupHandlerMap.get(device.id());
502         if (groupHandler != null) {
503             groupHandler.portDown(port.number());
504         }
505     }
506
507     private class InternalConfigListener implements NetworkConfigListener {
508         SegmentRoutingManager segmentRoutingManager;
509
510         public InternalConfigListener(SegmentRoutingManager srMgr) {
511             this.segmentRoutingManager = srMgr;
512         }
513
514         public void configureNetwork() {
515             deviceConfiguration = new DeviceConfiguration(segmentRoutingManager.cfgService);
516
517             arpHandler = new ArpHandler(segmentRoutingManager);
518             icmpHandler = new IcmpHandler(segmentRoutingManager);
519             ipHandler = new IpHandler(segmentRoutingManager);
520             routingRulePopulator = new RoutingRulePopulator(segmentRoutingManager);
521             defaultRoutingHandler = new DefaultRoutingHandler(segmentRoutingManager);
522
523             tunnelHandler = new TunnelHandler(linkService, deviceConfiguration,
524                                               groupHandlerMap, tunnelStore);
525             policyHandler = new PolicyHandler(appId, deviceConfiguration,
526                                               flowObjectiveService,
527                                               tunnelHandler, policyStore);
528
529             packetService.addProcessor(processor, PacketProcessor.director(2));
530             linkService.addListener(new InternalLinkListener());
531             deviceService.addListener(new InternalDeviceListener());
532
533             for (Device device : deviceService.getDevices()) {
534                 //Irrespective whether the local is a MASTER or not for this device,
535                 //create group handler instance and push default TTP flow rules.
536                 //Because in a multi-instance setup, instances can initiate
537                 //groups for any devices. Also the default TTP rules are needed
538                 //to be pushed before inserting any IP table entries for any device
539                 DefaultGroupHandler groupHandler = DefaultGroupHandler
540                         .createGroupHandler(device.id(), appId,
541                                             deviceConfiguration, linkService,
542                                             flowObjectiveService,
543                                             nsNextObjStore);
544                 groupHandlerMap.put(device.id(), groupHandler);
545                 defaultRoutingHandler.populateTtpRules(device.id());
546             }
547
548             defaultRoutingHandler.startPopulationProcess();
549         }
550
551         @Override
552         public void event(NetworkConfigEvent event) {
553             if ((event.type() == NetworkConfigEvent.Type.CONFIG_ADDED ||
554                     event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) &&
555                     event.configClass().equals(SegmentRoutingConfig.class)) {
556                 log.info("Network configuration change detected. (Re)Configuring...");
557                 configureNetwork();
558                 return;
559             }
560         }
561     }
562 }