2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.provider.of.flow.impl;
18 import com.google.common.cache.Cache;
19 import com.google.common.cache.CacheBuilder;
20 import com.google.common.cache.RemovalCause;
21 import com.google.common.cache.RemovalNotification;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Modified;
29 import org.apache.felix.scr.annotations.Property;
30 import org.apache.felix.scr.annotations.Reference;
31 import org.apache.felix.scr.annotations.ReferenceCardinality;
32 import org.onosproject.cfg.ComponentConfigService;
33 import org.onosproject.core.ApplicationId;
34 import org.onosproject.net.DeviceId;
35 import org.onosproject.net.flow.CompletedBatchOperation;
36 import org.onosproject.net.flow.DefaultTableStatisticsEntry;
37 import org.onosproject.net.flow.FlowEntry;
38 import org.onosproject.net.flow.FlowRule;
39 import org.onosproject.net.flow.FlowRuleBatchEntry;
40 import org.onosproject.net.flow.FlowRuleBatchOperation;
41 import org.onosproject.net.flow.FlowRuleExtPayLoad;
42 import org.onosproject.net.flow.FlowRuleProvider;
43 import org.onosproject.net.flow.FlowRuleProviderRegistry;
44 import org.onosproject.net.flow.FlowRuleProviderService;
45 import org.onosproject.net.flow.TableStatisticsEntry;
46 import org.onosproject.net.provider.AbstractProvider;
47 import org.onosproject.net.provider.ProviderId;
48 import org.onosproject.net.statistic.DefaultLoad;
49 import org.onosproject.openflow.controller.Dpid;
50 import org.onosproject.openflow.controller.OpenFlowController;
51 import org.onosproject.openflow.controller.OpenFlowEventListener;
52 import org.onosproject.openflow.controller.OpenFlowSwitch;
53 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
54 import org.onosproject.openflow.controller.RoleState;
55 import org.onosproject.openflow.controller.ThirdPartyMessage;
56 import org.osgi.service.component.ComponentContext;
57 import org.projectfloodlight.openflow.protocol.OFBadRequestCode;
58 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
59 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
60 import org.projectfloodlight.openflow.protocol.OFErrorType;
61 import org.projectfloodlight.openflow.protocol.OFFlowMod;
62 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
63 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
64 import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
65 import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
66 import org.projectfloodlight.openflow.protocol.OFMessage;
67 import org.projectfloodlight.openflow.protocol.OFPortStatus;
68 import org.projectfloodlight.openflow.protocol.OFStatsReply;
69 import org.projectfloodlight.openflow.protocol.OFStatsType;
70 import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
71 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
72 import org.slf4j.Logger;
74 import java.util.Collections;
75 import java.util.Dictionary;
76 import java.util.List;
78 import java.util.Objects;
79 import java.util.Optional;
81 import java.util.Timer;
82 import java.util.concurrent.TimeUnit;
83 import java.util.stream.Collectors;
85 import static com.google.common.base.Preconditions.checkNotNull;
86 import static com.google.common.base.Strings.isNullOrEmpty;
87 import static org.onlab.util.Tools.get;
88 import static org.slf4j.LoggerFactory.getLogger;
91 * Provider which uses an OpenFlow controller to detect network end-station
94 @Component(immediate = true)
95 public class OpenFlowRuleProvider extends AbstractProvider
96 implements FlowRuleProvider {
98 private final Logger log = getLogger(getClass());
100 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101 protected FlowRuleProviderRegistry providerRegistry;
103 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104 protected OpenFlowController controller;
106 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107 protected ComponentConfigService cfgService;
109 private static final int DEFAULT_POLL_FREQUENCY = 5;
110 @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
111 label = "Frequency (in seconds) for polling flow statistics")
112 private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
114 private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
115 @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
116 label = "Adaptive Flow Sampling is on or off")
117 private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
119 private FlowRuleProviderService providerService;
121 private final InternalFlowProvider listener = new InternalFlowProvider();
123 private Cache<Long, InternalCacheEntry> pendingBatches;
125 private final Timer timer = new Timer("onos-openflow-collector");
126 private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
128 // NewAdaptiveFlowStatsCollector Set
129 private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
130 private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
131 private final Map<Dpid, TableStatisticsCollector> tableStatsCollectors = Maps.newHashMap();
134 * Creates an OpenFlow host provider.
136 public OpenFlowRuleProvider() {
137 super(new ProviderId("of", "org.onosproject.provider.openflow"));
141 public void activate(ComponentContext context) {
142 cfgService.registerProperties(getClass());
143 providerService = providerRegistry.register(this);
144 controller.addListener(listener);
145 controller.addEventListener(listener);
147 pendingBatches = createBatchCache();
151 log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
152 flowPollFrequency, adaptiveFlowSampling);
156 public void deactivate(ComponentContext context) {
157 cfgService.unregisterProperties(getClass(), false);
159 providerRegistry.unregister(this);
160 providerService = null;
166 public void modified(ComponentContext context) {
167 Dictionary<?, ?> properties = context.getProperties();
168 int newFlowPollFrequency;
170 String s = get(properties, "flowPollFrequency");
171 newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
173 } catch (NumberFormatException | ClassCastException e) {
174 newFlowPollFrequency = flowPollFrequency;
177 if (newFlowPollFrequency != flowPollFrequency) {
178 flowPollFrequency = newFlowPollFrequency;
182 log.info("Settings: flowPollFrequency={}", flowPollFrequency);
184 boolean newAdaptiveFlowSampling;
185 String s = get(properties, "adaptiveFlowSampling");
186 newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
188 if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
189 // stop previous collector
191 adaptiveFlowSampling = newAdaptiveFlowSampling;
192 // create new collectors
196 log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
199 private Cache<Long, InternalCacheEntry> createBatchCache() {
200 return CacheBuilder.newBuilder()
201 .expireAfterWrite(10, TimeUnit.SECONDS)
202 .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
203 if (notification.getCause() == RemovalCause.EXPIRED) {
204 providerService.batchOperationCompleted(notification.getKey(),
205 notification.getValue().failedCompletion());
210 private void createCollectors() {
211 controller.getSwitches().forEach(this::createCollector);
214 private void createCollector(OpenFlowSwitch sw) {
215 if (adaptiveFlowSampling) {
216 // NewAdaptiveFlowStatsCollector Constructor
217 NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
219 afsCollectors.put(new Dpid(sw.getId()), fsc);
221 FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
223 simpleCollectors.put(new Dpid(sw.getId()), fsc);
225 TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency);
227 tableStatsCollectors.put(new Dpid(sw.getId()), tsc);
230 private void stopCollectors() {
231 if (adaptiveFlowSampling) {
232 // NewAdaptiveFlowStatsCollector Destructor
233 afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
234 afsCollectors.clear();
236 simpleCollectors.values().forEach(FlowStatsCollector::stop);
237 simpleCollectors.clear();
239 tableStatsCollectors.values().forEach(TableStatisticsCollector::stop);
240 tableStatsCollectors.clear();
243 private void adjustRate() {
244 DefaultLoad.setPollInterval(flowPollFrequency);
245 if (adaptiveFlowSampling) {
246 // NewAdaptiveFlowStatsCollector calAndPollInterval
247 afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
249 simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
251 tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
255 public void applyFlowRule(FlowRule... flowRules) {
256 for (FlowRule flowRule : flowRules) {
261 private void applyRule(FlowRule flowRule) {
262 Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
263 OpenFlowSwitch sw = controller.getSwitch(dpid);
265 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
266 if (hasPayload(flowRuleExtPayLoad)) {
267 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
271 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
272 Optional.empty()).buildFlowAdd());
274 if (adaptiveFlowSampling) {
275 // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
276 NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
277 if (collector != null) {
278 collector.addWithFlowRule(flowRule);
284 public void removeFlowRule(FlowRule... flowRules) {
285 for (FlowRule flowRule : flowRules) {
286 removeRule(flowRule);
290 private void removeRule(FlowRule flowRule) {
291 Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
292 OpenFlowSwitch sw = controller.getSwitch(dpid);
294 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
295 if (hasPayload(flowRuleExtPayLoad)) {
296 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
300 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
301 Optional.empty()).buildFlowDel());
303 if (adaptiveFlowSampling) {
304 // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
305 NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
306 if (collector != null) {
307 collector.removeFlows(flowRule);
313 public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
314 // TODO: optimize using the ApplicationId
315 removeFlowRule(flowRules);
319 public void executeBatch(FlowRuleBatchOperation batch) {
322 pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
324 Dpid dpid = Dpid.dpid(batch.deviceId().uri());
325 OpenFlowSwitch sw = controller.getSwitch(dpid);
327 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
328 // flow is the third party privacy flow
330 FlowRuleExtPayLoad flowRuleExtPayLoad = fbe.target().payLoad();
331 if (hasPayload(flowRuleExtPayLoad)) {
332 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
336 FlowModBuilder builder =
337 FlowModBuilder.builder(fbe.target(), sw.factory(), Optional.of(batch.id()));
338 NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
339 switch (fbe.operator()) {
341 mod = builder.buildFlowAdd();
342 if (adaptiveFlowSampling && collector != null) {
343 // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
344 collector.addWithFlowRule(fbe.target());
348 mod = builder.buildFlowDel();
349 if (adaptiveFlowSampling && collector != null) {
350 // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
351 collector.removeFlows(fbe.target());
355 mod = builder.buildFlowMod();
356 if (adaptiveFlowSampling && collector != null) {
357 // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
358 // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
359 collector.addOrUpdateFlows((FlowEntry) fbe.target());
363 log.error("Unsupported batch operation {}; skipping flowmod {}",
364 fbe.operator(), fbe);
369 OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
371 sw.sendMsg(builder.build());
374 private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
375 return flowRuleExtPayLoad != null &&
376 flowRuleExtPayLoad.payLoad() != null &&
377 flowRuleExtPayLoad.payLoad().length > 0;
380 private class InternalFlowProvider
381 implements OpenFlowSwitchListener, OpenFlowEventListener {
384 public void switchAdded(Dpid dpid) {
386 OpenFlowSwitch sw = controller.getSwitch(dpid);
388 createCollector(controller.getSwitch(dpid));
392 public void switchRemoved(Dpid dpid) {
393 if (adaptiveFlowSampling) {
394 NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
395 if (collector != null) {
399 FlowStatsCollector collector = simpleCollectors.remove(dpid);
400 if (collector != null) {
404 TableStatisticsCollector tsc = tableStatsCollectors.remove(dpid);
411 public void switchChanged(Dpid dpid) {
415 public void portChanged(Dpid dpid, OFPortStatus status) {
416 // TODO: Decide whether to evict flows internal store.
420 public void handleMessage(Dpid dpid, OFMessage msg) {
421 OpenFlowSwitch sw = controller.getSwitch(dpid);
422 switch (msg.getType()) {
424 OFFlowRemoved removed = (OFFlowRemoved) msg;
426 FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
427 providerService.flowRemoved(fr);
429 if (adaptiveFlowSampling) {
430 // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
431 NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
432 if (collector != null) {
433 collector.flowRemoved(fr);
438 if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
439 pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
440 } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
441 pushTableStatistics(dpid, (OFTableStatsReply) msg);
446 InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
449 .batchOperationCompleted(msg.getXid(),
452 log.warn("Received unknown Barrier Reply: {}",
456 pendingBatches.invalidate(msg.getXid());
460 // TODO: This needs to get suppressed in a better way.
461 if (msg instanceof OFBadRequestErrorMsg &&
462 ((OFBadRequestErrorMsg) msg).getCode() == OFBadRequestCode.BAD_TYPE) {
463 log.debug("Received error message {} from {}", msg, dpid);
465 log.warn("Received error message {} from {}", msg, dpid);
468 OFErrorMsg error = (OFErrorMsg) msg;
469 if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
470 OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
471 if (fmFailed.getData().getParsedMessage().isPresent()) {
472 OFMessage m = fmFailed.getData().getParsedMessage().get();
473 OFFlowMod fm = (OFFlowMod) m;
474 InternalCacheEntry entry =
475 pendingBatches.getIfPresent(msg.getXid());
477 entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
479 log.error("No matching batch for this error: {}", error);
482 // FIXME: Potentially add flowtracking to avoid this message.
483 log.error("Flow installation failed but switch didn't"
484 + " tell us which one.");
489 log.debug("Unhandled message type: {}", msg.getType());
494 public void receivedRoleReply(Dpid dpid, RoleState requested,
495 RoleState response) {
496 // Do nothing here for now.
499 private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
501 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
503 List<FlowEntry> flowEntries = replies.getEntries().stream()
504 .map(entry -> new FlowEntryBuilder(dpid, entry).build())
505 .collect(Collectors.toList());
507 if (adaptiveFlowSampling) {
508 NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
510 synchronized (afsc) {
511 if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
512 log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
513 + "OFFlowStatsReply Xid={}, for {}",
514 afsc.getFlowMissingXid(), replies.getXid(), dpid);
517 // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
518 if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
519 if (afsc.getFlowMissingXid() == replies.getXid()) {
520 // call entire flow stats update with flowMissing synchronization.
521 // used existing pushFlowMetrics
522 providerService.pushFlowMetrics(did, flowEntries);
524 // reset flowMissingXid to NO_FLOW_MISSING_XID
525 afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
528 // call individual flow stats update
529 providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
532 // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
533 afsc.pushFlowMetrics(flowEntries);
536 // call existing entire flow stats update with flowMissing synchronization
537 providerService.pushFlowMetrics(did, flowEntries);
541 private void pushTableStatistics(Dpid dpid, OFTableStatsReply replies) {
543 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
544 List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
545 .map(entry -> buildTableStatistics(did, entry))
546 .filter(Objects::nonNull)
547 .collect(Collectors.toList());
548 providerService.pushTableStatistics(did, tableStatsEntries);
551 private TableStatisticsEntry buildTableStatistics(DeviceId deviceId,
552 OFTableStatsEntry ofEntry) {
553 TableStatisticsEntry entry = null;
554 if (ofEntry != null) {
555 entry = new DefaultTableStatisticsEntry(deviceId,
556 ofEntry.getTableId().getValue(),
557 ofEntry.getActiveCount(),
558 ofEntry.getLookupCount().getValue(),
559 ofEntry.getMatchedCount().getValue());
568 * The internal cache entry holding the original request as well as
569 * accumulating the any failures along the way.
571 * If this entry is evicted from the cache then the entire operation is
572 * considered failed. Otherwise, only the failures reported by the device
573 * will be propagated up.
575 private class InternalCacheEntry {
577 private final FlowRuleBatchOperation operation;
578 private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
580 public InternalCacheEntry(FlowRuleBatchOperation operation) {
581 this.operation = operation;
585 * Appends a failed rule to the set of failed items.
587 * @param rule the failed rule
589 public void appendFailure(FlowRule rule) {
594 * Fails the entire batch and returns the failed operation.
596 * @return the failed operation
598 public CompletedBatchOperation failedCompletion() {
599 Set<FlowRule> fails = operation.getOperations().stream()
600 .map(op -> op.target()).collect(Collectors.toSet());
601 return new CompletedBatchOperation(false,
603 .unmodifiableSet(fails),
604 operation.deviceId());
608 * Returns the completed operation and whether the batch suceeded.
610 * @return the completed operation
612 public CompletedBatchOperation completed() {
613 return new CompletedBatchOperation(
616 .unmodifiableSet(failures),
617 operation.deviceId());