874faabfe9f879975632fbda941cd233f1a2a922
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.segmentrouting;
17
18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.packet.Ethernet;
25 import org.onlab.packet.IPv4;
26 import org.onlab.util.KryoNamespace;
27 import org.onosproject.core.ApplicationId;
28 import org.onosproject.core.CoreService;
29 import org.onosproject.event.Event;
30 import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
31 import org.onosproject.segmentrouting.grouphandler.NeighborSet;
32 import org.onosproject.segmentrouting.grouphandler.NeighborSetNextObjectiveStoreKey;
33 import org.onosproject.mastership.MastershipService;
34 import org.onosproject.net.Device;
35 import org.onosproject.net.DeviceId;
36 import org.onosproject.net.Link;
37 import org.onosproject.net.Port;
38 import org.onosproject.net.device.DeviceEvent;
39 import org.onosproject.net.device.DeviceListener;
40 import org.onosproject.net.device.DeviceService;
41 import org.onosproject.net.flowobjective.FlowObjectiveService;
42 import org.onosproject.net.group.GroupKey;
43 import org.onosproject.net.host.HostService;
44 import org.onosproject.net.intent.IntentService;
45 import org.onosproject.net.link.LinkEvent;
46 import org.onosproject.net.link.LinkListener;
47 import org.onosproject.net.link.LinkService;
48 import org.onosproject.net.packet.InboundPacket;
49 import org.onosproject.net.packet.PacketContext;
50 import org.onosproject.net.packet.PacketProcessor;
51 import org.onosproject.net.packet.PacketService;
52 import org.onosproject.net.topology.TopologyService;
53 import org.onosproject.segmentrouting.config.NetworkConfigManager;
54 import org.onosproject.store.service.EventuallyConsistentMap;
55 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
56 import org.onosproject.store.service.StorageService;
57 import org.onosproject.store.service.WallClockTimestamp;
58 import org.slf4j.Logger;
59 import org.slf4j.LoggerFactory;
60
61 import java.net.URI;
62 import java.util.HashSet;
63 import java.util.List;
64 import java.util.Map;
65 import java.util.concurrent.ConcurrentHashMap;
66 import java.util.concurrent.ConcurrentLinkedQueue;
67 import java.util.concurrent.Executors;
68 import java.util.concurrent.ScheduledExecutorService;
69 import java.util.concurrent.ScheduledFuture;
70 import java.util.concurrent.TimeUnit;
71
72 @SuppressWarnings("ALL")
73 @Service
74 @Component(immediate = true)
75 public class SegmentRoutingManager implements SegmentRoutingService {
76
77     private static Logger log = LoggerFactory
78             .getLogger(SegmentRoutingManager.class);
79
80     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
81     protected CoreService coreService;
82
83     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
84     protected TopologyService topologyService;
85
86     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
87     protected PacketService packetService;
88
89     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
90     protected IntentService intentService;
91
92     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
93     protected HostService hostService;
94
95     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
96     protected DeviceService deviceService;
97
98     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
99     protected FlowObjectiveService flowObjectiveService;
100
101     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
102     protected LinkService linkService;
103
104     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
105     protected MastershipService mastershipService;
106
107     protected ArpHandler arpHandler = null;
108     protected IcmpHandler icmpHandler = null;
109     protected IpHandler ipHandler = null;
110     protected RoutingRulePopulator routingRulePopulator = null;
111     protected ApplicationId appId;
112     protected DeviceConfiguration deviceConfiguration = null;
113
114
115     private DefaultRoutingHandler defaultRoutingHandler = null;
116     private TunnelHandler tunnelHandler = null;
117     private PolicyHandler policyHandler = null;
118     private InternalPacketProcessor processor = new InternalPacketProcessor();
119     private InternalEventHandler eventHandler = new InternalEventHandler();
120
121     private ScheduledExecutorService executorService = Executors
122             .newScheduledThreadPool(1);
123
124     private static ScheduledFuture<?> eventHandlerFuture = null;
125     private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<Event>();
126     private Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<DeviceId, DefaultGroupHandler>();
127     // Per device next objective ID store with (device id + neighbor set) as key
128     private EventuallyConsistentMap<NeighborSetNextObjectiveStoreKey,
129         Integer> nsNextObjStore = null;
130     private EventuallyConsistentMap<String, Tunnel> tunnelStore = null;
131     private EventuallyConsistentMap<String, Policy> policyStore = null;
132
133     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134     protected StorageService storageService;
135
136     private NetworkConfigManager networkConfigService = new NetworkConfigManager();;
137
138     private Object threadSchedulerLock = new Object();
139     private static int numOfEventsQueued = 0;
140     private static int numOfEventsExecuted = 0;
141     private static int numOfHandlerExecution = 0;
142     private static int numOfHandlerScheduled = 0;
143
144     private KryoNamespace.Builder kryoBuilder = null;
145
146     @Activate
147     protected void activate() {
148         appId = coreService
149                 .registerApplication("org.onosproject.segmentrouting");
150
151         kryoBuilder = new KryoNamespace.Builder()
152             .register(NeighborSetNextObjectiveStoreKey.class,
153                     NeighborSet.class,
154                     DeviceId.class,
155                     URI.class,
156                     WallClockTimestamp.class,
157                     org.onosproject.cluster.NodeId.class,
158                     HashSet.class,
159                     Tunnel.class,
160                     DefaultTunnel.class,
161                     Policy.class,
162                     TunnelPolicy.class,
163                     Policy.Type.class
164             );
165
166         log.debug("Creating EC map nsnextobjectivestore");
167         EventuallyConsistentMapBuilder<NeighborSetNextObjectiveStoreKey, Integer>
168                 nsNextObjMapBuilder = storageService.eventuallyConsistentMapBuilder();
169
170         nsNextObjStore = nsNextObjMapBuilder
171                 .withName("nsnextobjectivestore")
172                 .withSerializer(kryoBuilder)
173                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
174                 .build();
175         log.trace("Current size {}", nsNextObjStore.size());
176
177         EventuallyConsistentMapBuilder<String, Tunnel> tunnelMapBuilder =
178                 storageService.eventuallyConsistentMapBuilder();
179
180         tunnelStore = tunnelMapBuilder
181                 .withName("tunnelstore")
182                 .withSerializer(kryoBuilder)
183                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
184                 .build();
185
186         EventuallyConsistentMapBuilder<String, Policy> policyMapBuilder =
187                 storageService.eventuallyConsistentMapBuilder();
188
189         policyStore = policyMapBuilder
190                 .withName("policystore")
191                 .withSerializer(kryoBuilder)
192                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
193                 .build();
194
195         networkConfigService.init();
196         deviceConfiguration = new DeviceConfiguration(networkConfigService);
197         arpHandler = new ArpHandler(this);
198         icmpHandler = new IcmpHandler(this);
199         ipHandler = new IpHandler(this);
200         routingRulePopulator = new RoutingRulePopulator(this);
201         defaultRoutingHandler = new DefaultRoutingHandler(this);
202         tunnelHandler = new TunnelHandler(linkService, deviceConfiguration,
203                 groupHandlerMap, tunnelStore);
204         policyHandler = new PolicyHandler(appId, deviceConfiguration,
205                 flowObjectiveService, tunnelHandler, policyStore);
206
207         packetService.addProcessor(processor, PacketProcessor.director(2));
208         linkService.addListener(new InternalLinkListener());
209         deviceService.addListener(new InternalDeviceListener());
210
211         for (Device device : deviceService.getDevices()) {
212             //Irrespective whether the local is a MASTER or not for this device,
213             //create group handler instance and push default TTP flow rules.
214             //Because in a multi-instance setup, instances can initiate
215             //groups for any devices. Also the default TTP rules are needed
216             //to be pushed before inserting any IP table entries for any device
217             DefaultGroupHandler groupHandler = DefaultGroupHandler
218                     .createGroupHandler(device.id(), appId,
219                                         deviceConfiguration, linkService,
220                                         flowObjectiveService,
221                                         nsNextObjStore);
222             groupHandlerMap.put(device.id(), groupHandler);
223             defaultRoutingHandler.populateTtpRules(device.id());
224         }
225
226         defaultRoutingHandler.startPopulationProcess();
227         log.info("Started");
228
229     }
230
231     @Deactivate
232     protected void deactivate() {
233         packetService.removeProcessor(processor);
234         processor = null;
235         log.info("Stopped");
236     }
237
238
239     @Override
240     public List<Tunnel> getTunnels() {
241         return tunnelHandler.getTunnels();
242     }
243
244     @Override
245     public TunnelHandler.Result createTunnel(Tunnel tunnel) {
246         return tunnelHandler.createTunnel(tunnel);
247     }
248
249     @Override
250     public TunnelHandler.Result removeTunnel(Tunnel tunnel) {
251         for (Policy policy: policyHandler.getPolicies()) {
252             if (policy.type() == Policy.Type.TUNNEL_FLOW) {
253                 TunnelPolicy tunnelPolicy = (TunnelPolicy) policy;
254                 if (tunnelPolicy.tunnelId().equals(tunnel.id())) {
255                     log.warn("Cannot remove the tunnel used by a policy");
256                     return TunnelHandler.Result.TUNNEL_IN_USE;
257                 }
258             }
259         }
260         return tunnelHandler.removeTunnel(tunnel);
261     }
262
263     @Override
264     public PolicyHandler.Result removePolicy(Policy policy) {
265         return policyHandler.removePolicy(policy);
266     }
267
268     @Override
269     public PolicyHandler.Result createPolicy(Policy policy) {
270         return policyHandler.createPolicy(policy);
271     }
272
273     @Override
274     public List<Policy> getPolicies() {
275         return policyHandler.getPolicies();
276     }
277
278     /**
279      * Returns the tunnel object with the tunnel ID.
280      *
281      * @param tunnelId Tunnel ID
282      * @return Tunnel reference
283      */
284     public Tunnel getTunnel(String tunnelId) {
285         return tunnelHandler.getTunnel(tunnelId);
286     }
287
288     /**
289      * Returns the GrouopKey object for the device and the NighborSet given.
290      *
291      * @param ns NeightborSet object for the GroupKey
292      * @return GroupKey object for the NeighborSet
293      */
294     public GroupKey getGroupKey(NeighborSet ns) {
295
296         for (DefaultGroupHandler groupHandler : groupHandlerMap.values()) {
297             return groupHandler.getGroupKey(ns);
298         }
299
300         return null;
301     }
302
303     /**
304      * Returns the next objective ID for the NeighborSet given. If the nextObjectiveID does not exist,
305      * a new one is created and returned.
306      *
307      * @param deviceId Device ID
308      * @param ns NegighborSet
309      * @return next objective ID
310      */
311     public int getNextObjectiveId(DeviceId deviceId, NeighborSet ns) {
312
313         if (groupHandlerMap.get(deviceId) != null) {
314             log.trace("getNextObjectiveId query in device {}", deviceId);
315             return groupHandlerMap
316                     .get(deviceId).getNextObjectiveId(ns);
317         } else {
318             log.warn("getNextObjectiveId query in device {} not found", deviceId);
319             return -1;
320         }
321     }
322
323     private class InternalPacketProcessor implements PacketProcessor {
324
325         @Override
326         public void process(PacketContext context) {
327
328             if (context.isHandled()) {
329                 return;
330             }
331
332             InboundPacket pkt = context.inPacket();
333             Ethernet ethernet = pkt.parsed();
334
335             if (ethernet.getEtherType() == Ethernet.TYPE_ARP) {
336                 arpHandler.processPacketIn(pkt);
337             } else if (ethernet.getEtherType() == Ethernet.TYPE_IPV4) {
338                 IPv4 ipPacket = (IPv4) ethernet.getPayload();
339                 ipHandler.addToPacketBuffer(ipPacket);
340                 if (ipPacket.getProtocol() == IPv4.PROTOCOL_ICMP) {
341                     icmpHandler.processPacketIn(pkt);
342                 } else {
343                     ipHandler.processPacketIn(pkt);
344                 }
345             }
346         }
347     }
348
349     private class InternalLinkListener implements LinkListener {
350         @Override
351         public void event(LinkEvent event) {
352             if (event.type() == LinkEvent.Type.LINK_ADDED
353                     || event.type() == LinkEvent.Type.LINK_REMOVED) {
354                 log.debug("Event {} received from Link Service", event.type());
355                 scheduleEventHandlerIfNotScheduled(event);
356             }
357         }
358     }
359
360     private class InternalDeviceListener implements DeviceListener {
361
362         @Override
363         public void event(DeviceEvent event) {
364             /*if (mastershipService.getLocalRole(event.subject().id()) != MastershipRole.MASTER) {
365                 log.debug("Local role {} is not MASTER for device {}",
366                           mastershipService.getLocalRole(event.subject().id()),
367                           event.subject().id());
368                 return;
369             }*/
370
371             switch (event.type()) {
372             case DEVICE_ADDED:
373             case PORT_REMOVED:
374             case DEVICE_UPDATED:
375             case DEVICE_AVAILABILITY_CHANGED:
376                 log.debug("Event {} received from Device Service", event.type());
377                 scheduleEventHandlerIfNotScheduled(event);
378                 break;
379             default:
380             }
381         }
382     }
383
384     private void scheduleEventHandlerIfNotScheduled(Event event) {
385
386         synchronized (threadSchedulerLock) {
387             eventQueue.add(event);
388             numOfEventsQueued++;
389
390             if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
391                 //No pending scheduled event handling threads. So start a new one.
392                 eventHandlerFuture = executorService
393                         .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
394                 numOfHandlerScheduled++;
395             }
396             log.trace("numOfEventsQueued {}, numOfEventHanlderScheduled {}",
397                       numOfEventsQueued,
398                       numOfHandlerScheduled);
399         }
400     }
401
402     private class InternalEventHandler implements Runnable {
403
404         @Override
405         public void run() {
406             try {
407                 while (true) {
408                     Event event = null;
409                     synchronized (threadSchedulerLock) {
410                         if (!eventQueue.isEmpty()) {
411                             event = eventQueue.poll();
412                             numOfEventsExecuted++;
413                         } else {
414                             numOfHandlerExecution++;
415                             log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
416                                       numOfHandlerExecution, numOfEventsExecuted);
417                             break;
418                         }
419                     }
420                     if (event.type() == LinkEvent.Type.LINK_ADDED) {
421                         processLinkAdded((Link) event.subject());
422                     } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
423                         processLinkRemoved((Link) event.subject());
424                     //} else if (event.type() == GroupEvent.Type.GROUP_ADDED) {
425                     //    processGroupAdded((Group) event.subject());
426                     } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
427                             event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
428                             event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
429                         if (deviceService.isAvailable(((Device) event.subject()).id())) {
430                             processDeviceAdded((Device) event.subject());
431                         }
432                     } else if (event.type() == DeviceEvent.Type.PORT_REMOVED) {
433                         processPortRemoved((Device) event.subject(),
434                                            ((DeviceEvent) event).port());
435                     } else {
436                         log.warn("Unhandled event type: {}", event.type());
437                     }
438                 }
439             } catch (Exception e) {
440                 log.error("SegmentRouting event handler "
441                         + "thread thrown an exception: {}", e);
442             }
443         }
444     }
445
446     private void processLinkAdded(Link link) {
447         log.debug("A new link {} was added", link.toString());
448
449         //Irrespective whether the local is a MASTER or not for this device,
450         //create group handler instance and push default TTP flow rules.
451         //Because in a multi-instance setup, instances can initiate
452         //groups for any devices. Also the default TTP rules are needed
453         //to be pushed before inserting any IP table entries for any device
454         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src()
455                 .deviceId());
456         if (groupHandler != null) {
457             groupHandler.linkUp(link);
458         } else {
459             Device device = deviceService.getDevice(link.src().deviceId());
460             if (device != null) {
461                 log.warn("processLinkAdded: Link Added "
462                         + "Notification without Device Added "
463                         + "event, still handling it");
464                 processDeviceAdded(device);
465                 groupHandler = groupHandlerMap.get(link.src()
466                                                    .deviceId());
467                 groupHandler.linkUp(link);
468             }
469         }
470
471         log.trace("Starting optimized route population process");
472         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(null);
473         //log.trace("processLinkAdded: re-starting route population process");
474         //defaultRoutingHandler.startPopulationProcess();
475     }
476
477     private void processLinkRemoved(Link link) {
478         log.debug("A link {} was removed", link.toString());
479         DefaultGroupHandler groupHandler = groupHandlerMap.get(link.src().deviceId());
480         if (groupHandler != null) {
481             groupHandler.portDown(link.src().port());
482         }
483         log.trace("Starting optimized route population process");
484         defaultRoutingHandler.populateRoutingRulesForLinkStatusChange(link);
485         //log.trace("processLinkRemoved: re-starting route population process");
486         //defaultRoutingHandler.startPopulationProcess();
487     }
488
489     private void processDeviceAdded(Device device) {
490         log.debug("A new device with ID {} was added", device.id());
491         //Irrespective whether the local is a MASTER or not for this device,
492         //create group handler instance and push default TTP flow rules.
493         //Because in a multi-instance setup, instances can initiate
494         //groups for any devices. Also the default TTP rules are needed
495         //to be pushed before inserting any IP table entries for any device
496         DefaultGroupHandler dgh = DefaultGroupHandler.
497                 createGroupHandler(device.id(),
498                                    appId,
499                                    deviceConfiguration,
500                                    linkService,
501                                    flowObjectiveService,
502                                    nsNextObjStore);
503         groupHandlerMap.put(device.id(), dgh);
504         defaultRoutingHandler.populateTtpRules(device.id());
505     }
506
507     private void processPortRemoved(Device device, Port port) {
508         log.debug("Port {} was removed", port.toString());
509         DefaultGroupHandler groupHandler = groupHandlerMap.get(device.id());
510         if (groupHandler != null) {
511             groupHandler.portDown(port.number());
512         }
513     }
514
515
516
517 }