de079e0303e690901df17aee577e7d9483b537c4
[onosfw.git] /
1 /*
2  * Copyright 2014 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.provider.of.flow.impl;
17
18 import com.google.common.cache.Cache;
19 import com.google.common.cache.CacheBuilder;
20 import com.google.common.cache.RemovalCause;
21 import com.google.common.cache.RemovalNotification;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Modified;
28 import org.apache.felix.scr.annotations.Property;
29 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.onosproject.cfg.ComponentConfigService;
32 import org.onosproject.core.ApplicationId;
33 import org.onosproject.net.DeviceId;
34 import org.onosproject.net.flow.CompletedBatchOperation;
35 import org.onosproject.net.flow.FlowEntry;
36 import org.onosproject.net.flow.FlowRule;
37 import org.onosproject.net.flow.FlowRuleBatchEntry;
38 import org.onosproject.net.flow.FlowRuleBatchOperation;
39 import org.onosproject.net.flow.FlowRuleExtPayLoad;
40 import org.onosproject.net.flow.FlowRuleProvider;
41 import org.onosproject.net.flow.FlowRuleProviderRegistry;
42 import org.onosproject.net.flow.FlowRuleProviderService;
43 import org.onosproject.net.provider.AbstractProvider;
44 import org.onosproject.net.provider.ProviderId;
45 import org.onosproject.net.statistic.DefaultLoad;
46 import org.onosproject.openflow.controller.Dpid;
47 import org.onosproject.openflow.controller.OpenFlowController;
48 import org.onosproject.openflow.controller.OpenFlowEventListener;
49 import org.onosproject.openflow.controller.OpenFlowSwitch;
50 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
51 import org.onosproject.openflow.controller.RoleState;
52 import org.onosproject.openflow.controller.ThirdPartyMessage;
53 import org.osgi.service.component.ComponentContext;
54 import org.projectfloodlight.openflow.protocol.OFBadRequestCode;
55 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
56 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
57 import org.projectfloodlight.openflow.protocol.OFErrorType;
58 import org.projectfloodlight.openflow.protocol.OFFlowMod;
59 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
60 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
61 import org.projectfloodlight.openflow.protocol.OFMessage;
62 import org.projectfloodlight.openflow.protocol.OFPortStatus;
63 import org.projectfloodlight.openflow.protocol.OFStatsReply;
64 import org.projectfloodlight.openflow.protocol.OFStatsType;
65 import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
66 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
67 import org.slf4j.Logger;
68
69 import java.util.Collections;
70 import java.util.Dictionary;
71 import java.util.List;
72 import java.util.Map;
73 import java.util.Optional;
74 import java.util.Set;
75 import java.util.Timer;
76 import java.util.concurrent.TimeUnit;
77 import java.util.stream.Collectors;
78
79 import static com.google.common.base.Strings.isNullOrEmpty;
80 import static org.onlab.util.Tools.get;
81 import static org.slf4j.LoggerFactory.getLogger;
82
83 /**
84  * Provider which uses an OpenFlow controller to detect network end-station
85  * hosts.
86  */
87 @Component(immediate = true)
88 public class OpenFlowRuleProvider extends AbstractProvider
89         implements FlowRuleProvider {
90
91     private final Logger log = getLogger(getClass());
92
93     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94     protected FlowRuleProviderRegistry providerRegistry;
95
96     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97     protected OpenFlowController controller;
98
99     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100     protected ComponentConfigService cfgService;
101
102     private static final int DEFAULT_POLL_FREQUENCY = 10;
103     @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
104             label = "Frequency (in seconds) for polling flow statistics")
105     private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
106
107     private FlowRuleProviderService providerService;
108
109     private final InternalFlowProvider listener = new InternalFlowProvider();
110
111     private Cache<Long, InternalCacheEntry> pendingBatches;
112
113     private final Timer timer = new Timer("onos-openflow-collector");
114     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
115
116     /**
117      * Creates an OpenFlow host provider.
118      */
119     public OpenFlowRuleProvider() {
120         super(new ProviderId("of", "org.onosproject.provider.openflow"));
121     }
122
123     @Activate
124     public void activate(ComponentContext context) {
125         cfgService.registerProperties(getClass());
126         providerService = providerRegistry.register(this);
127         controller.addListener(listener);
128         controller.addEventListener(listener);
129
130         pendingBatches = createBatchCache();
131         createCollectors();
132
133         log.info("Started");
134     }
135
136     @Deactivate
137     public void deactivate(ComponentContext context) {
138         cfgService.unregisterProperties(getClass(), false);
139         stopCollectors();
140         providerRegistry.unregister(this);
141         providerService = null;
142
143         log.info("Stopped");
144     }
145
146     @Modified
147     public void modified(ComponentContext context) {
148         Dictionary<?, ?> properties = context.getProperties();
149         int newFlowPollFrequency;
150         try {
151             String s = get(properties, "flowPollFrequency");
152             newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
153
154         } catch (NumberFormatException | ClassCastException e) {
155             newFlowPollFrequency = flowPollFrequency;
156         }
157
158         if (newFlowPollFrequency != flowPollFrequency) {
159             flowPollFrequency = newFlowPollFrequency;
160             adjustRate();
161         }
162
163         log.info("Settings: flowPollFrequency={}", flowPollFrequency);
164     }
165
166     private Cache<Long, InternalCacheEntry> createBatchCache() {
167         return CacheBuilder.newBuilder()
168                 .expireAfterWrite(10, TimeUnit.SECONDS)
169                 .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
170                     if (notification.getCause() == RemovalCause.EXPIRED) {
171                         providerService.batchOperationCompleted(notification.getKey(),
172                                                                 notification.getValue().failedCompletion());
173                     }
174                 }).build();
175     }
176
177     private void createCollectors() {
178         controller.getSwitches().forEach(this::createCollector);
179     }
180
181     private void createCollector(OpenFlowSwitch sw) {
182         FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
183         fsc.start();
184         collectors.put(new Dpid(sw.getId()), fsc);
185     }
186
187     private void stopCollectors() {
188         collectors.values().forEach(FlowStatsCollector::stop);
189         collectors.clear();
190     }
191
192     private void adjustRate() {
193         DefaultLoad.setPollInterval(flowPollFrequency);
194         collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
195     }
196
197     @Override
198     public void applyFlowRule(FlowRule... flowRules) {
199         for (FlowRule flowRule : flowRules) {
200             applyRule(flowRule);
201         }
202     }
203
204     private void applyRule(FlowRule flowRule) {
205         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
206                                                                    .uri()));
207         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
208         if (hasPayload(flowRuleExtPayLoad)) {
209             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
210             sw.sendMsg(msg);
211             return;
212         }
213         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
214                                           Optional.empty()).buildFlowAdd());
215     }
216
217     @Override
218     public void removeFlowRule(FlowRule... flowRules) {
219         for (FlowRule flowRule : flowRules) {
220             removeRule(flowRule);
221         }
222     }
223
224     private void removeRule(FlowRule flowRule) {
225         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
226                                                                    .uri()));
227         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
228         if (hasPayload(flowRuleExtPayLoad)) {
229             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
230             sw.sendMsg(msg);
231             return;
232         }
233         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
234                                           Optional.empty()).buildFlowDel());
235     }
236
237     @Override
238     public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
239         // TODO: optimize using the ApplicationId
240         removeFlowRule(flowRules);
241     }
242
243     @Override
244     public void executeBatch(FlowRuleBatchOperation batch) {
245
246         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
247
248         OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
249                                                                    .uri()));
250         OFFlowMod mod;
251         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
252             // flow is the third party privacy flow
253
254             FlowRuleExtPayLoad flowRuleExtPayLoad = fbe.target().payLoad();
255             if (hasPayload(flowRuleExtPayLoad)) {
256                 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
257                 sw.sendMsg(msg);
258                 continue;
259             }
260             FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
261                     .factory(), Optional.of(batch.id()));
262             switch (fbe.operator()) {
263                 case ADD:
264                     mod = builder.buildFlowAdd();
265                     break;
266                 case REMOVE:
267                     mod = builder.buildFlowDel();
268                     break;
269                 case MODIFY:
270                     mod = builder.buildFlowMod();
271                     break;
272                 default:
273                     log.error("Unsupported batch operation {}; skipping flowmod {}",
274                               fbe.operator(), fbe);
275                     continue;
276             }
277             sw.sendMsg(mod);
278         }
279         OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
280                 .setXid(batch.id());
281         sw.sendMsg(builder.build());
282     }
283
284     private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
285         return flowRuleExtPayLoad != null &&
286                 flowRuleExtPayLoad.payLoad() != null &&
287                 flowRuleExtPayLoad.payLoad().length > 0;
288     }
289
290     private class InternalFlowProvider
291             implements OpenFlowSwitchListener, OpenFlowEventListener {
292
293         @Override
294         public void switchAdded(Dpid dpid) {
295             createCollector(controller.getSwitch(dpid));
296         }
297
298         @Override
299         public void switchRemoved(Dpid dpid) {
300             FlowStatsCollector collector = collectors.remove(dpid);
301             if (collector != null) {
302                 collector.stop();
303             }
304         }
305
306         @Override
307         public void switchChanged(Dpid dpid) {
308         }
309
310         @Override
311         public void portChanged(Dpid dpid, OFPortStatus status) {
312             // TODO: Decide whether to evict flows internal store.
313         }
314
315         @Override
316         public void handleMessage(Dpid dpid, OFMessage msg) {
317             OpenFlowSwitch sw = controller.getSwitch(dpid);
318             switch (msg.getType()) {
319                 case FLOW_REMOVED:
320                     OFFlowRemoved removed = (OFFlowRemoved) msg;
321
322                     FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
323                     providerService.flowRemoved(fr);
324                     break;
325                 case STATS_REPLY:
326                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
327                         pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
328                     }
329                     break;
330                 case BARRIER_REPLY:
331                     try {
332                         InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
333                         if (entry != null) {
334                             providerService
335                                     .batchOperationCompleted(msg.getXid(),
336                                                              entry.completed());
337                         } else {
338                             log.warn("Received unknown Barrier Reply: {}",
339                                      msg.getXid());
340                         }
341                     } finally {
342                         pendingBatches.invalidate(msg.getXid());
343                     }
344                     break;
345                 case ERROR:
346                     // TODO: This needs to get suppressed in a better way.
347                     if (msg instanceof OFBadRequestErrorMsg &&
348                             ((OFBadRequestErrorMsg) msg).getCode() == OFBadRequestCode.BAD_TYPE) {
349                         log.debug("Received error message {} from {}", msg, dpid);
350                     } else {
351                         log.warn("Received error message {} from {}", msg, dpid);
352                     }
353
354                     OFErrorMsg error = (OFErrorMsg) msg;
355                     if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
356                         OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
357                         if (fmFailed.getData().getParsedMessage().isPresent()) {
358                             OFMessage m = fmFailed.getData().getParsedMessage().get();
359                             OFFlowMod fm = (OFFlowMod) m;
360                             InternalCacheEntry entry =
361                                     pendingBatches.getIfPresent(msg.getXid());
362                             if (entry != null) {
363                                 entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
364                             } else {
365                                 log.error("No matching batch for this error: {}", error);
366                             }
367                         } else {
368                             // FIXME: Potentially add flowtracking to avoid this message.
369                             log.error("Flow installation failed but switch didn't"
370                                               + " tell us which one.");
371                         }
372                     }
373                     break;
374                 default:
375                     log.debug("Unhandled message type: {}", msg.getType());
376             }
377
378         }
379
380         @Override
381         public void receivedRoleReply(Dpid dpid, RoleState requested,
382                                       RoleState response) {
383             // Do nothing here for now.
384         }
385
386         private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
387
388             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
389             OpenFlowSwitch sw = controller.getSwitch(dpid);
390
391             List<FlowEntry> flowEntries = replies.getEntries().stream()
392                     .map(entry -> new FlowEntryBuilder(dpid, entry).build())
393                     .collect(Collectors.toList());
394
395             providerService.pushFlowMetrics(did, flowEntries);
396         }
397     }
398
399     /**
400      * The internal cache entry holding the original request as well as
401      * accumulating the any failures along the way.
402      * <p/>
403      * If this entry is evicted from the cache then the entire operation is
404      * considered failed. Otherwise, only the failures reported by the device
405      * will be propagated up.
406      */
407     private class InternalCacheEntry {
408
409         private final FlowRuleBatchOperation operation;
410         private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
411
412         public InternalCacheEntry(FlowRuleBatchOperation operation) {
413             this.operation = operation;
414         }
415
416         /**
417          * Appends a failed rule to the set of failed items.
418          *
419          * @param rule the failed rule
420          */
421         public void appendFailure(FlowRule rule) {
422             failures.add(rule);
423         }
424
425         /**
426          * Fails the entire batch and returns the failed operation.
427          *
428          * @return the failed operation
429          */
430         public CompletedBatchOperation failedCompletion() {
431             Set<FlowRule> fails = operation.getOperations().stream()
432                     .map(op -> op.target()).collect(Collectors.toSet());
433             return new CompletedBatchOperation(false,
434                                                Collections
435                                                        .unmodifiableSet(fails),
436                                                operation.deviceId());
437         }
438
439         /**
440          * Returns the completed operation and whether the batch suceeded.
441          *
442          * @return the completed operation
443          */
444         public CompletedBatchOperation completed() {
445             return new CompletedBatchOperation(
446                     failures.isEmpty(),
447                     Collections
448                             .unmodifiableSet(failures),
449                     operation.deviceId());
450         }
451     }
452
453 }