eb2cf5694cb19dfcd281249f451fec69795a1ac6
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.segmentrouting;
17
18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.packet.Ethernet;
25 import org.onlab.packet.IPv4;
26 import org.onlab.util.KryoNamespace;
27 import org.onosproject.core.ApplicationId;
28 import org.onosproject.core.CoreService;
29 import org.onosproject.event.Event;
30 import org.onosproject.net.config.ConfigFactory;
31 import org.onosproject.net.config.NetworkConfigEvent;
32 import org.onosproject.net.config.NetworkConfigRegistry;
33 import org.onosproject.net.config.NetworkConfigListener;
34 import org.onosproject.net.config.basics.SubjectFactories;
35 import org.onosproject.segmentrouting.config.SegmentRoutingConfig;
36 import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
37 import org.onosproject.segmentrouting.grouphandler.NeighborSet;
38 import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
39 import org.onosproject.mastership.MastershipService;
40 import org.onosproject.net.Device;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.Link;
43 import org.onosproject.net.Port;
44 import org.onosproject.net.device.DeviceEvent;
45 import org.onosproject.net.device.DeviceListener;
46 import org.onosproject.net.device.DeviceService;
47 import org.onosproject.net.flowobjective.FlowObjectiveService;
48 import org.onosproject.net.group.GroupKey;
49 import org.onosproject.net.host.HostService;
50 import org.onosproject.net.intent.IntentService;
51 import org.onosproject.net.link.LinkEvent;
52 import org.onosproject.net.link.LinkListener;
53 import org.onosproject.net.link.LinkService;
54 import org.onosproject.net.packet.InboundPacket;
55 import org.onosproject.net.packet.PacketContext;
56 import org.onosproject.net.packet.PacketProcessor;
57 import org.onosproject.net.packet.PacketService;
58 import org.onosproject.net.topology.TopologyService;
59 import org.onosproject.store.service.EventuallyConsistentMap;
60 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
61 import org.onosproject.store.service.StorageService;
62 import org.onosproject.store.service.WallClockTimestamp;
63 import org.slf4j.Logger;
64 import org.slf4j.LoggerFactory;
65
66 import java.net.URI;
67 import java.util.HashSet;
68 import java.util.List;
69 import java.util.Map;
70 import java.util.concurrent.ConcurrentHashMap;
71 import java.util.concurrent.ConcurrentLinkedQueue;
72 import java.util.concurrent.Executors;
73 import java.util.concurrent.ScheduledExecutorService;
74 import java.util.concurrent.ScheduledFuture;
75 import java.util.concurrent.TimeUnit;
76
77 @SuppressWarnings("ALL")
78 @Service
79 @Component(immediate = true)
80 public class SegmentRoutingManager implements SegmentRoutingService {
81
82     private static Logger log = LoggerFactory
83             .getLogger(SegmentRoutingManager.class);
84
85     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
86     protected CoreService coreService;
87
88     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
89     protected TopologyService topologyService;
90
91     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
92     protected PacketService packetService;
93
94     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
95     protected IntentService intentService;
96
97     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
98     protected HostService hostService;
99
100     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101     protected DeviceService deviceService;
102
103     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104     protected FlowObjectiveService flowObjectiveService;
105
106     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107     protected LinkService linkService;
108
109     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
110     protected MastershipService mastershipService;
111
112     protected ArpHandler arpHandler = null;
113     protected IcmpHandler icmpHandler = null;
114     protected IpHandler ipHandler = null;
115     protected RoutingRulePopulator routingRulePopulator = null;
116     protected ApplicationId appId;
117     protected DeviceConfiguration deviceConfiguration = null;
118
119     private DefaultRoutingHandler defaultRoutingHandler = null;
120     private TunnelHandler tunnelHandler = null;
121     private PolicyHandler policyHandler = null;
122     private InternalPacketProcessor processor = null;
123     private InternalLinkListener linkListener = null;
124     private InternalDeviceListener deviceListener = null;
125     private InternalEventHandler eventHandler = new InternalEventHandler();
126
127     private ScheduledExecutorService executorService = Executors
128             .newScheduledThreadPool(1);
129
130     private static ScheduledFuture<?> eventHandlerFuture = null;
131     private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
132     private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
133     // Per device next objective ID store with (device id + neighbor set) as key
134     private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
135         Integer> nsNextObjStore = null;
136     private EventuallyConsistentMap<String, Tunnel> tunnelStore = null;
137     private EventuallyConsistentMap<String, Policy> policyStore = null;
138
139     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
140     protected StorageService storageService;
141
142     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
143     protected NetworkConfigRegistry cfgService;
144
145     private final InternalConfigListener cfgListener =
146             new InternalConfigListener(this);
147
148     private final ConfigFactory cfgFactory =
149             new ConfigFactory(SubjectFactories.DEVICE_SUBJECT_FACTORY,
150                               SegmentRoutingConfig.class,
151                               "segmentrouting") {
152                 @Override
153                 public SegmentRoutingConfig createConfig() {
154                     return new SegmentRoutingConfig();
155                 }
156             };
157
158     private Object threadSchedulerLock = new Object();
159     private static int numOfEventsQueued = 0;
160     private static int numOfEventsExecuted = 0;
161     private static int numOfHandlerExecution = 0;
162     private static int numOfHandlerScheduled = 0;
163
164     private KryoNamespace.Builder kryoBuilder = null;
165
166     @Activate
167     protected void activate() {
168         appId = coreService
169                 .registerApplication("org.onosproject.segmentrouting");
170
171         kryoBuilder = new KryoNamespace.Builder()
172             .register(NeighborSetNextObjectiveStoreKey.class,
173                     NeighborSet.class,
174                     DeviceId.class,
175                     URI.class,
176                     WallClockTimestamp.class,
177                     org.onosproject.cluster.NodeId.class,
178                     HashSet.class,
179                     Tunnel.class,
180                     DefaultTunnel.class,
181                     Policy.class,
182                     TunnelPolicy.class,
183                     Policy.Type.class
184             );
185
186         log.debug("Creating EC map nsnextobjectivestore");
187         EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
188                 nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
189
190         nsNextObjStore = nsNextObjMapBuilder
191                 .withName("nsnextobjectivestore")
192                 .withSerializer(kryoBuilder)
193                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
194                 .build();
195         log.trace("Current size {}", nsNextObjStore.size());
196
197         EventuallyConsistentMapBuilder<String, Tunnel> tunnelMapBuilder =
198                 storageService.eventuallyConsistentMapBuilder();
199
200         tunnelStore = tunnelMapBuilder
201                 .withName("tunnelstore")
202                 .withSerializer(kryoBuilder)
203                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
204                 .build();
205
206         EventuallyConsistentMapBuilder<String, Policy> policyMapBuilder =
207                 storageService.eventuallyConsistentMapBuilder();
208
209         policyStore = policyMapBuilder
210                 .withName("policystore")
211                 .withSerializer(kryoBuilder)
212                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
213                 .build();
214
215         cfgService.addListener(cfgListener);
216         cfgService.registerConfigFactory(cfgFactory);
217
218         processor = new InternalPacketProcessor();
219         linkListener = new InternalLinkListener();
220         deviceListener = new InternalDeviceListener();
221
222         packetService.addProcessor(processor, PacketProcessor.director(2));
223         linkService.addListener(linkListener);
224         deviceService.addListener(deviceListener);
225
226         cfgListener.configureNetwork();
227
228         log.info("Started");
229     }
230
231     @Deactivate
232     protected void deactivate() {
233         cfgService.removeListener(cfgListener);
234         cfgService.unregisterConfigFactory(cfgFactory);
235
236         packetService.removeProcessor(processor);
237         linkService.removeListener(linkListener);
238         deviceService.removeListener(deviceListener);
239         processor = null;
240         linkListener = null;
241         deviceService = null;
242
243         groupHandlerMap.clear();
244
245         log.info("Stopped");
246     }
247
248
249     @Override
250     public List<Tunnel> getTunnels() {
251         return tunnelHandler.getTunnels();
252     }
253
254     @Override
255     public TunnelHandler.Result createTunnel(Tunnel tunnel) {
256         return tunnelHandler.createTunnel(tunnel);
257     }
258
259     @Override
260     public TunnelHandler.Result removeTunnel(Tunnel tunnel) {
261         for (Policy policy: policyHandler.getPolicies()) {
262             if (policy.type() == Policy.Type.TUNNEL_FLOW) {
263                 TunnelPolicy tunnelPolicy = (TunnelPolicy) policy;
264                 if (tunnelPolicy.tunnelId().equals(tunnel.id())) {
265                     log.warn("Cannot remove the tunnel used by a policy");
266                     return TunnelHandler.Result.TUNNEL_IN_USE;
267                 }
268             }
269         }
270         return tunnelHandler.removeTunnel(tunnel);
271     }
272
273     @Override
274     public PolicyHandler.Result removePolicy(Policy policy) {
275         return policyHandler.removePolicy(policy);
276     }
277
278     @Override
279     public PolicyHandler.Result createPolicy(Policy policy) {
280         return policyHandler.createPolicy(policy);
281     }
282
283     @Override
284     public List<Policy> getPolicies() {
285         return policyHandler.getPolicies();
286     }
287
288     /**
289      * Returns the tunnel object with the tunnel ID.
290      *
291      * @param tunnelId Tunnel ID
292      * @return Tunnel reference
293      */
294     public Tunnel getTunnel(String tunnelId) {
295         return tunnelHandler.getTunnel(tunnelId);
296     }
297
298     /**
299      * Returns the GroupKey object for the device and the NeighborSet given.
300      * XXX is this called
301      *
302      * @param ns NeightborSet object for the GroupKey
303      * @return GroupKey object for the NeighborSet
304      */
305     public GroupKey getGroupKey(NeighborSet ns) {
306         for (DefaultGroupHandler groupHandler : groupHandlerMap.values()) {
307             return groupHandler.getGroupKey(ns);
308         }
309
310         return null;
311     }
312
313     /**
314      * Returns the next objective ID for the NeighborSet given. If the nextObjectiveID does not exist,
315      * a new one is created and returned.
316      *
317      * @param deviceId Device ID
318      * @param ns NegighborSet
319      * @return next objective ID
320      */
321     public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
322         if (groupHandlerMap.get(deviceId) != null) {
323             log.trace("getNextObjectiveId query in device {}", deviceId);
324             return groupHandlerMap
325                     .get(deviceId).getNextObjectiveId(ns);
326         } else {
327             log.warn("getNextObjectiveId query in device {} not found", deviceId);
328             return -1;
329         }
330     }
331
332     private class InternalPacketProcessor implements PacketProcessor {
333         @Override
334         public void process(PacketContext context) {
335
336             if (context.isHandled()) {
337                 return;
338             }
339
340             InboundPacket pkt = context.inPacket();
341             Ethernet ethernet = pkt.parsed();
342
343             if (ethernet.getEtherType() == Ethernet.TYPE_ARP) {
344                 arpHandler.processPacketIn(pkt);
345             } else if (ethernet.getEtherType() == Ethernet.TYPE_IPV4) {
346                 IPv4 ipPacket = (IPv4) ethernet.getPayload();
347                 ipHandler.addToPacketBuffer(ipPacket);
348                 if (ipPacket.getProtocol() == IPv4.PROTOCOL_ICMP) {
349                     icmpHandler.processPacketIn(pkt);
350                 } else {
351                     ipHandler.processPacketIn(pkt);
352                 }
353             }
354         }
355     }
356
357     private class InternalLinkListener implements LinkListener {
358         @Override
359         public void event(LinkEvent event) {
360             if (event.type() == LinkEvent.Type.LINK_ADDED
361                     || event.type() == LinkEvent.Type.LINK_REMOVED) {
362                 log.debug("Event {} received from Link Service", event.type());
363                 scheduleEventHandlerIfNotScheduled(event);
364             }
365         }
366     }
367
368     private class InternalDeviceListener implements DeviceListener {
369         @Override
370         public void event(DeviceEvent event) {
371             switch (event.type()) {
372             case DEVICE_ADDED:
373             case PORT_REMOVED:
374             case DEVICE_UPDATED:
375             case DEVICE_AVAILABILITY_CHANGED:
376                 log.debug("Event {} received from Device Service", event.type());
377                 scheduleEventHandlerIfNotScheduled(event);
378                 break;
379             default:
380             }
381         }
382     }
383
384     private void scheduleEventHandlerIfNotScheduled(Event event) {
385         synchronized (threadSchedulerLock) {
386             eventQueue.add(event);
387             numOfEventsQueued++;
388
389             if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
390                 //No pending scheduled event handling threads. So start a new one.
391                 eventHandlerFuture = executorService
392                         .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
393                 numOfHandlerScheduled++;
394             }
395             log.trace("numOfEventsQueued {}, numOfEventHanlderScheduled {}",
396                       numOfEventsQueued,
397                       numOfHandlerScheduled);
398         }
399     }
400
401     private class InternalEventHandler implements Runnable {
402         @Override
403         public void run() {
404             try {
405                 while (true) {
406                     Event event = null;
407                     synchronized (threadSchedulerLock) {
408                         if (!eventQueue.isEmpty()) {
409                             event = eventQueue.poll();
410                             numOfEventsExecuted++;
411                         } else {
412                             numOfHandlerExecution++;
413                             log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
414                                       numOfHandlerExecution, numOfEventsExecuted);
415                             break;
416                         }
417                     }
418                     if (event.type() == LinkEvent.Type.LINK_ADDED) {
419                         processLinkAdded((Link) event.subject());
420                     } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
421                         processLinkRemoved((Link) event.subject());
422                     } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
423                             event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
424                             event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
425                         if (deviceService.isAvailable(((Device) event.subject()).id())) {
426                             processDeviceAdded((Device) event.subject());
427                         }
428                     } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
429                         processPortRemoved((Device) event.subject(),
430                                            ((DeviceEvent) event).port());
431                     } else {
432                         log.warn("Unhandled event type: {}", event.type());
433                     }
434                 }
435             } catch (Exception e) {
436                 log.error("SegmentRouting event handler "
437                         + "thread thrown an exception: {}", e);
438             }
439         }
440     }
441
442     private void processLinkAdded(Link link) {
443         log.debug("A new link {} was added", link.toString());
444
445         //Irrespective whether the local is a MASTER or not for this device,
446         //create group handler instance and push default TTP flow rules.
447         //Because in a multi-instance setup, instances can initiate
448         //groups for any devices. Also the default TTP rules are needed
449         //to be pushed before inserting any IP table entries for any device
450         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
451                 .deviceId());
452         if (groupHandler != null) {
453             groupHandler.linkUp(link);
454         } else {
455             Device device = deviceService.getDevice(link.src().deviceId());
456             if (device != null) {
457                 log.warn("processLinkAdded: Link Added "
458                         + "Notification without Device Added "
459                         + "event, still handling it");
460                 processDeviceAdded(device);
461                 groupHandler = groupHandlerMap.get(link.src()
462                                                    .deviceId());
463                 groupHandler.linkUp(link);
464             }
465         }
466
467         log.trace("Starting optimized route population process");
468         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
469         //log.trace("processLinkAdded: re-starting route population process");
470         //defaultRoutingHandler.startPopulationProcess();
471     }
472
473     private void processLinkRemoved(Link link) {
474         log.debug("A link {} was removed", link.toString());
475         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src().deviceId());
476         if (groupHandler != null) {
477             groupHandler.portDown(link.src().port());
478         }
479         log.trace("Starting optimized route population process");
480         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
481         //log.trace("processLinkRemoved: re-starting route population process");
482         //defaultRoutingHandler.startPopulationProcess();
483     }
484
485     private void processDeviceAdded(Device device) {
486         log.debug("A new device with ID {} was added", device.id());
487         //Irrespective whether the local is a MASTER or not for this device,
488         //create group handler instance and push default TTP flow rules.
489         //Because in a multi-instance setup, instances can initiate
490         //groups for any devices. Also the default TTP rules are needed
491         //to be pushed before inserting any IP table entries for any device
492         DefaultGroupHandler dgh = DefaultGroupHandler.
493                 createGroupHandler(device.id(),
494                                    appId,
495                                    deviceConfiguration,
496                                    linkService,
497                                    flowObjectiveService,
498                                    nsNextObjStore);
499         groupHandlerMap.put(device.id(), dgh);
500         defaultRoutingHandler.populateTtpRules(device.id());
501     }
502
503     private void processPortRemoved(Device device, Port port) {
504         log.debug("Port {} was removed", port.toString());
505         DefaultGroupHandler groupHandler = groupHandlerMap.get(device.id());
506         if (groupHandler != null) {
507             groupHandler.portDown(port.number());
508         }
509     }
510
511     private class InternalConfigListener implements NetworkConfigListener {
512         SegmentRoutingManager segmentRoutingManager;
513
514         public InternalConfigListener(SegmentRoutingManager srMgr) {
515             this.segmentRoutingManager = srMgr;
516         }
517
518         public void configureNetwork() {
519             deviceConfiguration = new DeviceConfiguration(segmentRoutingManager.cfgService);
520
521             arpHandler = new ArpHandler(segmentRoutingManager);
522             icmpHandler = new IcmpHandler(segmentRoutingManager);
523             ipHandler = new IpHandler(segmentRoutingManager);
524             routingRulePopulator = new RoutingRulePopulator(segmentRoutingManager);
525             defaultRoutingHandler = new DefaultRoutingHandler(segmentRoutingManager);
526
527             tunnelHandler = new TunnelHandler(linkService, deviceConfiguration,
528                                               groupHandlerMap, tunnelStore);
529             policyHandler = new PolicyHandler(appId, deviceConfiguration,
530                                               flowObjectiveService,
531                                               tunnelHandler, policyStore);
532
533             for (Device device : deviceService.getDevices()) {
534                 //Irrespective whether the local is a MASTER or not for this device,
535                 //create group handler instance and push default TTP flow rules.
536                 //Because in a multi-instance setup, instances can initiate
537                 //groups for any devices. Also the default TTP rules are needed
538                 //to be pushed before inserting any IP table entries for any device
539                 DefaultGroupHandler groupHandler = DefaultGroupHandler
540                         .createGroupHandler(device.id(), appId,
541                                             deviceConfiguration, linkService,
542                                             flowObjectiveService,
543                                             nsNextObjStore);
544                 groupHandlerMap.put(device.id(), groupHandler);
545                 defaultRoutingHandler.populateTtpRules(device.id());
546             }
547
548             defaultRoutingHandler.startPopulationProcess();
549         }
550
551         @Override
552         public void event(NetworkConfigEvent event) {
553             if (event.configClass().equals(SegmentRoutingConfig.class)) {
554                 if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
555                     log.info("Network configuration added.");
556                     configureNetwork();
557                 }
558                 if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
559                     log.info("Network configuration updated.");
560                     // TODO support dynamic configuration
561                 }
562             }
563         }
564     }
565 }