2 * Copyright 2014 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.provider.of.flow.impl;
18 import com.google.common.cache.Cache;
19 import com.google.common.cache.CacheBuilder;
20 import com.google.common.cache.RemovalCause;
21 import com.google.common.cache.RemovalNotification;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Modified;
28 import org.apache.felix.scr.annotations.Property;
29 import org.apache.felix.scr.annotations.Reference;
30 import org.apache.felix.scr.annotations.ReferenceCardinality;
31 import org.onosproject.cfg.ComponentConfigService;
32 import org.onosproject.core.ApplicationId;
33 import org.onosproject.net.DeviceId;
34 import org.onosproject.net.flow.CompletedBatchOperation;
35 import org.onosproject.net.flow.FlowEntry;
36 import org.onosproject.net.flow.FlowRule;
37 import org.onosproject.net.flow.FlowRuleBatchEntry;
38 import org.onosproject.net.flow.FlowRuleBatchOperation;
39 import org.onosproject.net.flow.FlowRuleExtPayLoad;
40 import org.onosproject.net.flow.FlowRuleProvider;
41 import org.onosproject.net.flow.FlowRuleProviderRegistry;
42 import org.onosproject.net.flow.FlowRuleProviderService;
43 import org.onosproject.net.provider.AbstractProvider;
44 import org.onosproject.net.provider.ProviderId;
45 import org.onosproject.net.statistic.DefaultLoad;
46 import org.onosproject.openflow.controller.Dpid;
47 import org.onosproject.openflow.controller.OpenFlowController;
48 import org.onosproject.openflow.controller.OpenFlowEventListener;
49 import org.onosproject.openflow.controller.OpenFlowSwitch;
50 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
51 import org.onosproject.openflow.controller.RoleState;
52 import org.onosproject.openflow.controller.ThirdPartyMessage;
53 import org.osgi.service.component.ComponentContext;
54 import org.projectfloodlight.openflow.protocol.OFBadRequestCode;
55 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
56 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
57 import org.projectfloodlight.openflow.protocol.OFErrorType;
58 import org.projectfloodlight.openflow.protocol.OFFlowMod;
59 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
60 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
61 import org.projectfloodlight.openflow.protocol.OFMessage;
62 import org.projectfloodlight.openflow.protocol.OFPortStatus;
63 import org.projectfloodlight.openflow.protocol.OFStatsReply;
64 import org.projectfloodlight.openflow.protocol.OFStatsType;
65 import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
66 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
67 import org.slf4j.Logger;
69 import java.util.Collections;
70 import java.util.Dictionary;
71 import java.util.List;
73 import java.util.Optional;
75 import java.util.Timer;
76 import java.util.concurrent.TimeUnit;
77 import java.util.stream.Collectors;
79 import static com.google.common.base.Strings.isNullOrEmpty;
80 import static org.onlab.util.Tools.get;
81 import static org.slf4j.LoggerFactory.getLogger;
84 * Provider which uses an OpenFlow controller to detect network end-station
87 @Component(immediate = true)
88 public class OpenFlowRuleProvider extends AbstractProvider
89 implements FlowRuleProvider {
91 private final Logger log = getLogger(getClass());
93 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
94 protected FlowRuleProviderRegistry providerRegistry;
96 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
97 protected OpenFlowController controller;
99 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
100 protected ComponentConfigService cfgService;
102 private static final int DEFAULT_POLL_FREQUENCY = 10;
103 @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
104 label = "Frequency (in seconds) for polling flow statistics")
105 private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
107 private FlowRuleProviderService providerService;
109 private final InternalFlowProvider listener = new InternalFlowProvider();
111 private Cache<Long, InternalCacheEntry> pendingBatches;
113 private final Timer timer = new Timer("onos-openflow-collector");
114 private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
117 * Creates an OpenFlow host provider.
119 public OpenFlowRuleProvider() {
120 super(new ProviderId("of", "org.onosproject.provider.openflow"));
124 public void activate(ComponentContext context) {
125 cfgService.registerProperties(getClass());
126 providerService = providerRegistry.register(this);
127 controller.addListener(listener);
128 controller.addEventListener(listener);
130 pendingBatches = createBatchCache();
137 public void deactivate(ComponentContext context) {
138 cfgService.unregisterProperties(getClass(), false);
140 providerRegistry.unregister(this);
141 providerService = null;
147 public void modified(ComponentContext context) {
148 Dictionary<?, ?> properties = context.getProperties();
149 int newFlowPollFrequency;
151 String s = get(properties, "flowPollFrequency");
152 newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
154 } catch (NumberFormatException | ClassCastException e) {
155 newFlowPollFrequency = flowPollFrequency;
158 if (newFlowPollFrequency != flowPollFrequency) {
159 flowPollFrequency = newFlowPollFrequency;
163 log.info("Settings: flowPollFrequency={}", flowPollFrequency);
166 private Cache<Long, InternalCacheEntry> createBatchCache() {
167 return CacheBuilder.newBuilder()
168 .expireAfterWrite(10, TimeUnit.SECONDS)
169 .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
170 if (notification.getCause() == RemovalCause.EXPIRED) {
171 providerService.batchOperationCompleted(notification.getKey(),
172 notification.getValue().failedCompletion());
177 private void createCollectors() {
178 controller.getSwitches().forEach(this::createCollector);
181 private void createCollector(OpenFlowSwitch sw) {
182 FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
184 collectors.put(new Dpid(sw.getId()), fsc);
187 private void stopCollectors() {
188 collectors.values().forEach(FlowStatsCollector::stop);
192 private void adjustRate() {
193 DefaultLoad.setPollInterval(flowPollFrequency);
194 collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
198 public void applyFlowRule(FlowRule... flowRules) {
199 for (FlowRule flowRule : flowRules) {
204 private void applyRule(FlowRule flowRule) {
205 OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
207 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
208 if (hasPayload(flowRuleExtPayLoad)) {
209 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
213 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
214 Optional.empty()).buildFlowAdd());
218 public void removeFlowRule(FlowRule... flowRules) {
219 for (FlowRule flowRule : flowRules) {
220 removeRule(flowRule);
224 private void removeRule(FlowRule flowRule) {
225 OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
227 FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
228 if (hasPayload(flowRuleExtPayLoad)) {
229 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
233 sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
234 Optional.empty()).buildFlowDel());
238 public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
239 // TODO: optimize using the ApplicationId
240 removeFlowRule(flowRules);
244 public void executeBatch(FlowRuleBatchOperation batch) {
246 pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
248 OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
251 for (FlowRuleBatchEntry fbe : batch.getOperations()) {
252 // flow is the third party privacy flow
254 FlowRuleExtPayLoad flowRuleExtPayLoad = fbe.target().payLoad();
255 if (hasPayload(flowRuleExtPayLoad)) {
256 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
260 FlowModBuilder builder = FlowModBuilder.builder(fbe.target(), sw
261 .factory(), Optional.of(batch.id()));
262 switch (fbe.operator()) {
264 mod = builder.buildFlowAdd();
267 mod = builder.buildFlowDel();
270 mod = builder.buildFlowMod();
273 log.error("Unsupported batch operation {}; skipping flowmod {}",
274 fbe.operator(), fbe);
279 OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
281 sw.sendMsg(builder.build());
284 private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
285 return flowRuleExtPayLoad != null &&
286 flowRuleExtPayLoad.payLoad() != null &&
287 flowRuleExtPayLoad.payLoad().length > 0;
290 private class InternalFlowProvider
291 implements OpenFlowSwitchListener, OpenFlowEventListener {
294 public void switchAdded(Dpid dpid) {
295 createCollector(controller.getSwitch(dpid));
299 public void switchRemoved(Dpid dpid) {
300 FlowStatsCollector collector = collectors.remove(dpid);
301 if (collector != null) {
307 public void switchChanged(Dpid dpid) {
311 public void portChanged(Dpid dpid, OFPortStatus status) {
312 // TODO: Decide whether to evict flows internal store.
316 public void handleMessage(Dpid dpid, OFMessage msg) {
317 OpenFlowSwitch sw = controller.getSwitch(dpid);
318 switch (msg.getType()) {
320 OFFlowRemoved removed = (OFFlowRemoved) msg;
322 FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
323 providerService.flowRemoved(fr);
326 if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
327 pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
332 InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
335 .batchOperationCompleted(msg.getXid(),
338 log.warn("Received unknown Barrier Reply: {}",
342 pendingBatches.invalidate(msg.getXid());
346 // TODO: This needs to get suppressed in a better way.
347 if (msg instanceof OFBadRequestErrorMsg &&
348 ((OFBadRequestErrorMsg) msg).getCode() == OFBadRequestCode.BAD_TYPE) {
349 log.debug("Received error message {} from {}", msg, dpid);
351 log.warn("Received error message {} from {}", msg, dpid);
354 OFErrorMsg error = (OFErrorMsg) msg;
355 if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
356 OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
357 if (fmFailed.getData().getParsedMessage().isPresent()) {
358 OFMessage m = fmFailed.getData().getParsedMessage().get();
359 OFFlowMod fm = (OFFlowMod) m;
360 InternalCacheEntry entry =
361 pendingBatches.getIfPresent(msg.getXid());
363 entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
365 log.error("No matching batch for this error: {}", error);
368 // FIXME: Potentially add flowtracking to avoid this message.
369 log.error("Flow installation failed but switch didn't"
370 + " tell us which one.");
375 log.debug("Unhandled message type: {}", msg.getType());
381 public void receivedRoleReply(Dpid dpid, RoleState requested,
382 RoleState response) {
383 // Do nothing here for now.
386 private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
388 DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
389 OpenFlowSwitch sw = controller.getSwitch(dpid);
391 List<FlowEntry> flowEntries = replies.getEntries().stream()
392 .map(entry -> new FlowEntryBuilder(dpid, entry).build())
393 .collect(Collectors.toList());
395 providerService.pushFlowMetrics(did, flowEntries);
400 * The internal cache entry holding the original request as well as
401 * accumulating the any failures along the way.
403 * If this entry is evicted from the cache then the entire operation is
404 * considered failed. Otherwise, only the failures reported by the device
405 * will be propagated up.
407 private class InternalCacheEntry {
409 private final FlowRuleBatchOperation operation;
410 private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
412 public InternalCacheEntry(FlowRuleBatchOperation operation) {
413 this.operation = operation;
417 * Appends a failed rule to the set of failed items.
419 * @param rule the failed rule
421 public void appendFailure(FlowRule rule) {
426 * Fails the entire batch and returns the failed operation.
428 * @return the failed operation
430 public CompletedBatchOperation failedCompletion() {
431 Set<FlowRule> fails = operation.getOperations().stream()
432 .map(op -> op.target()).collect(Collectors.toSet());
433 return new CompletedBatchOperation(false,
435 .unmodifiableSet(fails),
436 operation.deviceId());
440 * Returns the completed operation and whether the batch suceeded.
442 * @return the completed operation
444 public CompletedBatchOperation completed() {
445 return new CompletedBatchOperation(
448 .unmodifiableSet(failures),
449 operation.deviceId());