5ecdc7a2ff735c878accb540abfb7d60e92b649f
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.net.flowobjective.impl;
17
18 import com.google.common.collect.Maps;
19 import com.google.common.collect.Sets;
20 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate;
23 import org.apache.felix.scr.annotations.Reference;
24 import org.apache.felix.scr.annotations.ReferenceCardinality;
25 import org.apache.felix.scr.annotations.Service;
26 import org.onlab.osgi.DefaultServiceDirectory;
27 import org.onlab.osgi.ServiceDirectory;
28 import org.onlab.util.ItemNotFoundException;
29 import org.onosproject.cluster.ClusterService;
30 import org.onosproject.mastership.MastershipEvent;
31 import org.onosproject.mastership.MastershipListener;
32 import org.onosproject.mastership.MastershipService;
33 import org.onosproject.net.DeviceId;
34 import org.onosproject.net.behaviour.Pipeliner;
35 import org.onosproject.net.behaviour.PipelinerContext;
36 import org.onosproject.net.device.DeviceEvent;
37 import org.onosproject.net.device.DeviceListener;
38 import org.onosproject.net.device.DeviceService;
39 import org.onosproject.net.driver.DefaultDriverProviderService;
40 import org.onosproject.net.driver.DriverHandler;
41 import org.onosproject.net.driver.DriverService;
42 import org.onosproject.net.flow.FlowRuleService;
43 import org.onosproject.net.flowobjective.FilteringObjective;
44 import org.onosproject.net.flowobjective.FlowObjectiveService;
45 import org.onosproject.net.flowobjective.FlowObjectiveStore;
46 import org.onosproject.net.flowobjective.FlowObjectiveStoreDelegate;
47 import org.onosproject.net.flowobjective.ForwardingObjective;
48 import org.onosproject.net.flowobjective.NextObjective;
49 import org.onosproject.net.flowobjective.Objective;
50 import org.onosproject.net.flowobjective.ObjectiveError;
51 import org.onosproject.net.flowobjective.ObjectiveEvent;
52 import org.onosproject.net.group.GroupService;
53 import org.slf4j.Logger;
54 import org.slf4j.LoggerFactory;
55
56 import java.util.Map;
57 import java.util.Objects;
58 import java.util.Set;
59 import java.util.concurrent.ExecutorService;
60
61 import static com.google.common.base.Preconditions.checkNotNull;
62 import static java.util.concurrent.Executors.newFixedThreadPool;
63 import static org.onlab.util.Tools.groupedThreads;
64 import static org.onosproject.security.AppGuard.checkPermission;
65 import static org.onosproject.security.AppPermission.Type.*;
66
67
68
69 /**
70  * Provides implementation of the flow objective programming service.
71  */
72 @Component(immediate = true)
73 @Service
74 public class FlowObjectiveManager implements FlowObjectiveService {
75
76     public static final int INSTALL_RETRY_ATTEMPTS = 5;
77     public static final long INSTALL_RETRY_INTERVAL = 1000; // ms
78
79     private final Logger log = LoggerFactory.getLogger(getClass());
80
81     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
82     protected DriverService driverService;
83
84     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
85     protected DeviceService deviceService;
86
87     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
88     protected MastershipService mastershipService;
89
90     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
91     protected ClusterService clusterService;
92
93     // Note: The following dependencies are added on behalf of the pipeline
94     // driver behaviours to assure these services are available for their
95     // initialization.
96     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97     protected FlowRuleService flowRuleService;
98
99     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100     protected GroupService groupService;
101
102     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
103     protected FlowObjectiveStore flowObjectiveStore;
104
105     // Note: This must remain an optional dependency to allow re-install of default drivers.
106     // Note: For now disabled until we can move to OPTIONAL_UNARY dependency
107     // @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
108     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
109     protected DefaultDriverProviderService defaultDriverService;
110
111     private final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
112
113     private final Map<DeviceId, DriverHandler> driverHandlers = Maps.newConcurrentMap();
114     private final Map<DeviceId, Pipeliner> pipeliners = Maps.newConcurrentMap();
115
116     private final PipelinerContext context = new InnerPipelineContext();
117     private final MastershipListener mastershipListener = new InnerMastershipListener();
118     private final DeviceListener deviceListener = new InnerDeviceListener();
119
120     protected ServiceDirectory serviceDirectory = new DefaultServiceDirectory();
121
122     private Map<Integer, Set<PendingNext>> pendingForwards = Maps.newConcurrentMap();
123
124     private ExecutorService executorService;
125
126     @Activate
127     protected void activate() {
128         executorService = newFixedThreadPool(4, groupedThreads("onos/objective-installer", "%d"));
129         flowObjectiveStore.setDelegate(delegate);
130         mastershipService.addListener(mastershipListener);
131         deviceService.addListener(deviceListener);
132         deviceService.getDevices().forEach(device -> setupPipelineHandler(device.id()));
133         log.info("Started");
134     }
135
136     @Deactivate
137     protected void deactivate() {
138         flowObjectiveStore.unsetDelegate(delegate);
139         mastershipService.removeListener(mastershipListener);
140         deviceService.removeListener(deviceListener);
141         executorService.shutdown();
142         pipeliners.clear();
143         driverHandlers.clear();
144         log.info("Stopped");
145     }
146
147     /**
148      * Task that passes the flow objective down to the driver. The task will
149      * make a few attempts to find the appropriate driver, then eventually give
150      * up and report an error if no suitable driver could be found.
151      */
152     private class ObjectiveInstaller implements Runnable {
153         private final DeviceId deviceId;
154         private final Objective objective;
155
156         private final int numAttempts;
157
158         public ObjectiveInstaller(DeviceId deviceId, Objective objective) {
159             this(deviceId, objective, 1);
160         }
161
162         public ObjectiveInstaller(DeviceId deviceId, Objective objective, int attemps) {
163             this.deviceId = checkNotNull(deviceId);
164             this.objective = checkNotNull(objective);
165             this.numAttempts = checkNotNull(attemps);
166         }
167
168         @Override
169         public void run() {
170             try {
171                 Pipeliner pipeliner = getDevicePipeliner(deviceId);
172
173                 if (pipeliner != null) {
174                     if (objective instanceof NextObjective) {
175                         pipeliner.next((NextObjective) objective);
176                     } else if (objective instanceof ForwardingObjective) {
177                         pipeliner.forward((ForwardingObjective) objective);
178                     } else {
179                         pipeliner.filter((FilteringObjective) objective);
180                     }
181                 } else if (numAttempts < INSTALL_RETRY_ATTEMPTS) {
182                     Thread.sleep(INSTALL_RETRY_INTERVAL);
183                     executorService.submit(new ObjectiveInstaller(deviceId, objective, numAttempts + 1));
184                 } else {
185                     // Otherwise we've tried a few times and failed, report an
186                     // error back to the user.
187                     objective.context().ifPresent(
188                             c -> c.onError(objective, ObjectiveError.DEVICEMISSING));
189                 }
190             } catch (Exception e) {
191                 log.warn("Exception while installing flow objective", e);
192             }
193         }
194     }
195
196     @Override
197     public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
198         checkPermission(FLOWRULE_WRITE);
199         executorService.submit(new ObjectiveInstaller(deviceId, filteringObjective));
200     }
201
202     @Override
203     public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
204         checkPermission(FLOWRULE_WRITE);
205         if (queueObjective(deviceId, forwardingObjective)) {
206             return;
207         }
208         executorService.submit(new ObjectiveInstaller(deviceId, forwardingObjective));
209     }
210
211     @Override
212     public void next(DeviceId deviceId, NextObjective nextObjective) {
213         checkPermission(FLOWRULE_WRITE);
214         executorService.submit(new ObjectiveInstaller(deviceId, nextObjective));
215     }
216
217     @Override
218     public int allocateNextId() {
219         checkPermission(FLOWRULE_WRITE);
220         return flowObjectiveStore.allocateNextId();
221     }
222
223     @Override
224     public void initPolicy(String policy) {}
225
226     private boolean queueObjective(DeviceId deviceId, ForwardingObjective fwd) {
227         if (fwd.nextId() != null &&
228                 flowObjectiveStore.getNextGroup(fwd.nextId()) == null) {
229             log.trace("Queuing forwarding objective for nextId {}", fwd.nextId());
230             // TODO: change to computeIfAbsent
231             Set<PendingNext> pnext = pendingForwards.putIfAbsent(fwd.nextId(),
232                                          Sets.newHashSet(new PendingNext(deviceId, fwd)));
233             if (pnext != null) {
234                 pnext.add(new PendingNext(deviceId, fwd));
235             }
236             return true;
237         }
238         return false;
239     }
240
241     // Retrieves the device pipeline behaviour from the cache.
242     private Pipeliner getDevicePipeliner(DeviceId deviceId) {
243         return pipeliners.get(deviceId);
244     }
245
246     private void setupPipelineHandler(DeviceId deviceId) {
247         if (defaultDriverService == null) {
248             // We're not ready to go to work yet.
249             return;
250         }
251
252         // Attempt to lookup the handler in the cache
253         DriverHandler handler = driverHandlers.get(deviceId);
254         cTime = now();
255
256         if (handler == null) {
257             try {
258                 // Otherwise create it and if it has pipeline behaviour, cache it
259                 handler = driverService.createHandler(deviceId);
260                 dTime = now();
261                 if (!handler.driver().hasBehaviour(Pipeliner.class)) {
262                     log.warn("Pipeline behaviour not supported for device {}",
263                              deviceId);
264                     return;
265                 }
266             } catch (ItemNotFoundException e) {
267                 log.warn("No applicable driver for device {}", deviceId);
268                 return;
269             }
270
271             driverHandlers.put(deviceId, handler);
272             eTime = now();
273         }
274
275         // Always (re)initialize the pipeline behaviour
276         log.info("Driver {} bound to device {} ... initializing driver",
277                  handler.driver().name(), deviceId);
278         hTime = now();
279         Pipeliner pipeliner = handler.behaviour(Pipeliner.class);
280         hbTime = now();
281         pipeliner.init(deviceId, context);
282         pipeliners.putIfAbsent(deviceId, pipeliner);
283     }
284
285     // Triggers driver setup when the local node becomes a device master.
286     private class InnerMastershipListener implements MastershipListener {
287         @Override
288         public void event(MastershipEvent event) {
289             switch (event.type()) {
290                 case MASTER_CHANGED:
291                     log.debug("mastership changed on device {}", event.subject());
292                     start = now();
293                     if (deviceService.isAvailable(event.subject())) {
294                         setupPipelineHandler(event.subject());
295                     }
296                     stopWatch();
297                     break;
298                 case BACKUPS_CHANGED:
299                     break;
300                 default:
301                     break;
302             }
303         }
304     }
305
306     // Triggers driver setup when a device is (re)detected.
307     private class InnerDeviceListener implements DeviceListener {
308         @Override
309         public void event(DeviceEvent event) {
310             switch (event.type()) {
311                 case DEVICE_ADDED:
312                 case DEVICE_AVAILABILITY_CHANGED:
313                     log.debug("Device either added or availability changed {}",
314                               event.subject().id());
315                     start = now();
316                     if (deviceService.isAvailable(event.subject().id())) {
317                         log.debug("Device is now available {}", event.subject().id());
318                         setupPipelineHandler(event.subject().id());
319                     }
320                     stopWatch();
321                     break;
322                 case DEVICE_UPDATED:
323                     break;
324                 case DEVICE_REMOVED:
325                     break;
326                 case DEVICE_SUSPENDED:
327                     break;
328                 case PORT_ADDED:
329                     break;
330                 case PORT_UPDATED:
331                     break;
332                 case PORT_REMOVED:
333                     break;
334                 default:
335                     break;
336             }
337         }
338     }
339
340     // Temporary mechanism to monitor pipeliner setup time-cost; there are
341     // intermittent time where this takes in excess of 2 seconds. Why?
342     private long start = 0, totals = 0, count = 0;
343     private long cTime, dTime, eTime, hTime, hbTime;
344     private static final long LIMIT = 500;
345
346     private long now() {
347         return System.currentTimeMillis();
348     }
349
350     private void stopWatch() {
351         long duration = System.currentTimeMillis() - start;
352         totals += duration;
353         count += 1;
354         if (duration > LIMIT) {
355             log.info("Pipeline setup took {} ms; avg {} ms; cTime={}, dTime={}, eTime={}, hTime={}, hbTime={}",
356                      duration, totals / count, diff(cTime), diff(dTime), diff(eTime), diff(hTime), diff(hbTime));
357         }
358     }
359
360     private long diff(long bTime) {
361         long diff = bTime - start;
362         return diff < 0 ? 0 : diff;
363     }
364
365     // Processing context for initializing pipeline driver behaviours.
366     private class InnerPipelineContext implements PipelinerContext {
367         @Override
368         public ServiceDirectory directory() {
369             return serviceDirectory;
370         }
371
372         @Override
373         public FlowObjectiveStore store() {
374             return flowObjectiveStore;
375         }
376     }
377
378     private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
379         @Override
380         public void notify(ObjectiveEvent event) {
381             log.debug("Received notification of obj event {}", event);
382             Set<PendingNext> pending = pendingForwards.remove(event.subject());
383
384             if (pending == null) {
385                 log.debug("Nothing pending for this obj event");
386                 return;
387             }
388
389             log.debug("Processing pending forwarding objectives {}", pending.size());
390
391             pending.forEach(p -> getDevicePipeliner(p.deviceId())
392                     .forward(p.forwardingObjective()));
393
394         }
395     }
396
397     /**
398      * Data class used to hold a pending forwarding objective that could not
399      * be processed because the associated next object was not present.
400      */
401     private class PendingNext {
402         private final DeviceId deviceId;
403         private final ForwardingObjective fwd;
404
405         public PendingNext(DeviceId deviceId, ForwardingObjective fwd) {
406             this.deviceId = deviceId;
407             this.fwd = fwd;
408         }
409
410         public DeviceId deviceId() {
411             return deviceId;
412         }
413
414         public ForwardingObjective forwardingObjective() {
415             return fwd;
416         }
417
418         @Override
419         public int hashCode() {
420             return Objects.hash(deviceId, fwd);
421         }
422
423         @Override
424         public boolean equals(final Object obj) {
425             if (this == obj) {
426                 return true;
427             }
428             if (!(obj instanceof PendingNext)) {
429                 return false;
430             }
431             final PendingNext other = (PendingNext) obj;
432             if (this.deviceId.equals(other.deviceId) &&
433                     this.fwd.equals(other.fwd)) {
434                 return true;
435             }
436             return false;
437         }
438     }
439 }