6374ca5598644eda60f6106d094009a3fd60f309
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.provider.of.flow.impl;
17
18 import com.google.common.cache.Cache;
19 import com.google.common.cache.CacheBuilder;
20 import com.google.common.cache.RemovalCause;
21 import com.google.common.cache.RemovalNotification;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Modified;
29 import org.apache.felix.scr.annotations.Property;
30 import org.apache.felix.scr.annotations.Reference;
31 import org.apache.felix.scr.annotations.ReferenceCardinality;
32 import org.onosproject.cfg.ComponentConfigService;
33 import org.onosproject.core.ApplicationId;
34 import org.onosproject.net.DeviceId;
35 import org.onosproject.net.flow.CompletedBatchOperation;
36 import org.onosproject.net.flow.DefaultTableStatisticsEntry;
37 import org.onosproject.net.flow.FlowEntry;
38 import org.onosproject.net.flow.FlowRule;
39 import org.onosproject.net.flow.FlowRuleBatchEntry;
40 import org.onosproject.net.flow.FlowRuleBatchOperation;
41 import org.onosproject.net.flow.FlowRuleExtPayLoad;
42 import org.onosproject.net.flow.FlowRuleProvider;
43 import org.onosproject.net.flow.FlowRuleProviderRegistry;
44 import org.onosproject.net.flow.FlowRuleProviderService;
45 import org.onosproject.net.flow.TableStatisticsEntry;
46 import org.onosproject.net.provider.AbstractProvider;
47 import org.onosproject.net.provider.ProviderId;
48 import org.onosproject.net.statistic.DefaultLoad;
49 import org.onosproject.openflow.controller.Dpid;
50 import org.onosproject.openflow.controller.OpenFlowController;
51 import org.onosproject.openflow.controller.OpenFlowEventListener;
52 import org.onosproject.openflow.controller.OpenFlowSwitch;
53 import org.onosproject.openflow.controller.OpenFlowSwitchListener;
54 import org.onosproject.openflow.controller.RoleState;
55 import org.onosproject.openflow.controller.ThirdPartyMessage;
56 import org.osgi.service.component.ComponentContext;
57 import org.projectfloodlight.openflow.protocol.OFBadRequestCode;
58 import org.projectfloodlight.openflow.protocol.OFBarrierRequest;
59 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
60 import org.projectfloodlight.openflow.protocol.OFErrorType;
61 import org.projectfloodlight.openflow.protocol.OFFlowMod;
62 import org.projectfloodlight.openflow.protocol.OFFlowRemoved;
63 import org.projectfloodlight.openflow.protocol.OFFlowStatsReply;
64 import org.projectfloodlight.openflow.protocol.OFTableStatsReply;
65 import org.projectfloodlight.openflow.protocol.OFTableStatsEntry;
66 import org.projectfloodlight.openflow.protocol.OFMessage;
67 import org.projectfloodlight.openflow.protocol.OFPortStatus;
68 import org.projectfloodlight.openflow.protocol.OFStatsReply;
69 import org.projectfloodlight.openflow.protocol.OFStatsType;
70 import org.projectfloodlight.openflow.protocol.errormsg.OFBadRequestErrorMsg;
71 import org.projectfloodlight.openflow.protocol.errormsg.OFFlowModFailedErrorMsg;
72 import org.slf4j.Logger;
73
74 import java.util.Collections;
75 import java.util.Dictionary;
76 import java.util.List;
77 import java.util.Map;
78 import java.util.Objects;
79 import java.util.Optional;
80 import java.util.Set;
81 import java.util.Timer;
82 import java.util.concurrent.TimeUnit;
83 import java.util.stream.Collectors;
84
85 import static com.google.common.base.Preconditions.checkNotNull;
86 import static com.google.common.base.Strings.isNullOrEmpty;
87 import static org.onlab.util.Tools.get;
88 import static org.slf4j.LoggerFactory.getLogger;
89
90 /**
91  * Provider which uses an OpenFlow controller to detect network end-station
92  * hosts.
93  */
94 @Component(immediate = true)
95 public class OpenFlowRuleProvider extends AbstractProvider
96         implements FlowRuleProvider {
97
98     private final Logger log = getLogger(getClass());
99
100     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
101     protected FlowRuleProviderRegistry providerRegistry;
102
103     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
104     protected OpenFlowController controller;
105
106     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
107     protected ComponentConfigService cfgService;
108
109     private static final int DEFAULT_POLL_FREQUENCY = 5;
110     @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
111             label = "Frequency (in seconds) for polling flow statistics")
112     private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
113
114     private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
115     @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
116             label = "Adaptive Flow Sampling is on or off")
117     private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
118
119     private FlowRuleProviderService providerService;
120
121     private final InternalFlowProvider listener = new InternalFlowProvider();
122
123     private Cache<Long, InternalCacheEntry> pendingBatches;
124
125     private final Timer timer = new Timer("onos-openflow-collector");
126     private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
127
128     // NewAdaptiveFlowStatsCollector Set
129     private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
130     private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
131     private final Map<Dpid, TableStatisticsCollector> tableStatsCollectors = Maps.newHashMap();
132
133     /**
134      * Creates an OpenFlow host provider.
135      */
136     public OpenFlowRuleProvider() {
137         super(new ProviderId("of", "org.onosproject.provider.openflow"));
138     }
139
140     @Activate
141     public void activate(ComponentContext context) {
142         cfgService.registerProperties(getClass());
143         providerService = providerRegistry.register(this);
144         controller.addListener(listener);
145         controller.addEventListener(listener);
146
147         pendingBatches = createBatchCache();
148
149         createCollectors();
150
151         log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
152                 flowPollFrequency, adaptiveFlowSampling);
153     }
154
155     @Deactivate
156     public void deactivate(ComponentContext context) {
157         cfgService.unregisterProperties(getClass(), false);
158         stopCollectors();
159         providerRegistry.unregister(this);
160         providerService = null;
161
162         log.info("Stopped");
163     }
164
165     @Modified
166     public void modified(ComponentContext context) {
167         Dictionary<?, ?> properties = context.getProperties();
168         int newFlowPollFrequency;
169         try {
170             String s = get(properties, "flowPollFrequency");
171             newFlowPollFrequency = isNullOrEmpty(s) ? flowPollFrequency : Integer.parseInt(s.trim());
172
173         } catch (NumberFormatException | ClassCastException e) {
174             newFlowPollFrequency = flowPollFrequency;
175         }
176
177         if (newFlowPollFrequency != flowPollFrequency) {
178             flowPollFrequency = newFlowPollFrequency;
179             adjustRate();
180         }
181
182         log.info("Settings: flowPollFrequency={}", flowPollFrequency);
183
184         boolean newAdaptiveFlowSampling;
185         String s = get(properties, "adaptiveFlowSampling");
186         newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
187
188         if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
189             // stop previous collector
190             stopCollectors();
191             adaptiveFlowSampling = newAdaptiveFlowSampling;
192             // create new collectors
193             createCollectors();
194         }
195
196         log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
197     }
198
199     private Cache<Long, InternalCacheEntry> createBatchCache() {
200         return CacheBuilder.newBuilder()
201                 .expireAfterWrite(10, TimeUnit.SECONDS)
202                 .removalListener((RemovalNotification<Long, InternalCacheEntry> notification) -> {
203                     if (notification.getCause() == RemovalCause.EXPIRED) {
204                         providerService.batchOperationCompleted(notification.getKey(),
205                                                                 notification.getValue().failedCompletion());
206                     }
207                 }).build();
208     }
209
210     private void createCollectors() {
211         controller.getSwitches().forEach(this::createCollector);
212     }
213
214     private void createCollector(OpenFlowSwitch sw) {
215         if (adaptiveFlowSampling) {
216             // NewAdaptiveFlowStatsCollector Constructor
217             NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
218             fsc.start();
219             afsCollectors.put(new Dpid(sw.getId()), fsc);
220         } else {
221             FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
222             fsc.start();
223             simpleCollectors.put(new Dpid(sw.getId()), fsc);
224         }
225         TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency);
226         tsc.start();
227         tableStatsCollectors.put(new Dpid(sw.getId()), tsc);
228     }
229
230     private void stopCollectors() {
231         if (adaptiveFlowSampling) {
232             // NewAdaptiveFlowStatsCollector Destructor
233             afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
234             afsCollectors.clear();
235         } else {
236             simpleCollectors.values().forEach(FlowStatsCollector::stop);
237             simpleCollectors.clear();
238         }
239         tableStatsCollectors.values().forEach(TableStatisticsCollector::stop);
240         tableStatsCollectors.clear();
241     }
242
243     private void adjustRate() {
244         DefaultLoad.setPollInterval(flowPollFrequency);
245         if (adaptiveFlowSampling) {
246             // NewAdaptiveFlowStatsCollector calAndPollInterval
247             afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
248         } else {
249             simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
250         }
251         tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
252     }
253
254     @Override
255     public void applyFlowRule(FlowRule... flowRules) {
256         for (FlowRule flowRule : flowRules) {
257             applyRule(flowRule);
258         }
259     }
260
261     private void applyRule(FlowRule flowRule) {
262         Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
263         OpenFlowSwitch sw = controller.getSwitch(dpid);
264
265         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
266         if (hasPayload(flowRuleExtPayLoad)) {
267             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
268             sw.sendMsg(msg);
269             return;
270         }
271         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
272                                           Optional.empty()).buildFlowAdd());
273
274         if (adaptiveFlowSampling) {
275             // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
276             NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
277             if (collector != null) {
278                 collector.addWithFlowRule(flowRule);
279             }
280         }
281     }
282
283     @Override
284     public void removeFlowRule(FlowRule... flowRules) {
285         for (FlowRule flowRule : flowRules) {
286             removeRule(flowRule);
287         }
288     }
289
290     private void removeRule(FlowRule flowRule) {
291         Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
292         OpenFlowSwitch sw = controller.getSwitch(dpid);
293
294         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
295         if (hasPayload(flowRuleExtPayLoad)) {
296             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
297             sw.sendMsg(msg);
298             return;
299         }
300         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
301                                           Optional.empty()).buildFlowDel());
302
303         if (adaptiveFlowSampling) {
304             // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
305             NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
306             if (collector != null) {
307                 collector.removeFlows(flowRule);
308             }
309         }
310     }
311
312     @Override
313     public void removeRulesById(ApplicationId id, FlowRule... flowRules) {
314         // TODO: optimize using the ApplicationId
315         removeFlowRule(flowRules);
316     }
317
318     @Override
319     public void executeBatch(FlowRuleBatchOperation batch) {
320         checkNotNull(batch);
321
322         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
323
324         Dpid dpid = Dpid.dpid(batch.deviceId().uri());
325         OpenFlowSwitch sw = controller.getSwitch(dpid);
326         OFFlowMod mod;
327         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
328             // flow is the third party privacy flow
329
330             FlowRuleExtPayLoad flowRuleExtPayLoad = fbe.target().payLoad();
331             if (hasPayload(flowRuleExtPayLoad)) {
332                 OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
333                 sw.sendMsg(msg);
334                 continue;
335             }
336             FlowModBuilder builder =
337                     FlowModBuilder.builder(fbe.target(), sw.factory(), Optional.of(batch.id()));
338             NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
339             switch (fbe.operator()) {
340                 case ADD:
341                     mod = builder.buildFlowAdd();
342                     if (adaptiveFlowSampling && collector != null) {
343                         // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
344                         collector.addWithFlowRule(fbe.target());
345                     }
346                     break;
347                 case REMOVE:
348                     mod = builder.buildFlowDel();
349                     if (adaptiveFlowSampling && collector != null) {
350                         // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
351                         collector.removeFlows(fbe.target());
352                     }
353                     break;
354                 case MODIFY:
355                     mod = builder.buildFlowMod();
356                     if (adaptiveFlowSampling && collector != null) {
357                         // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
358                         // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
359                         collector.addOrUpdateFlows((FlowEntry) fbe.target());
360                     }
361                     break;
362                 default:
363                     log.error("Unsupported batch operation {}; skipping flowmod {}",
364                             fbe.operator(), fbe);
365                     continue;
366             }
367             sw.sendMsg(mod);
368         }
369         OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
370                 .setXid(batch.id());
371         sw.sendMsg(builder.build());
372     }
373
374     private boolean hasPayload(FlowRuleExtPayLoad flowRuleExtPayLoad) {
375         return flowRuleExtPayLoad != null &&
376                 flowRuleExtPayLoad.payLoad() != null &&
377                 flowRuleExtPayLoad.payLoad().length > 0;
378     }
379
380     private class InternalFlowProvider
381             implements OpenFlowSwitchListener, OpenFlowEventListener {
382
383         @Override
384         public void switchAdded(Dpid dpid) {
385
386             OpenFlowSwitch sw = controller.getSwitch(dpid);
387
388             createCollector(controller.getSwitch(dpid));
389         }
390
391         @Override
392         public void switchRemoved(Dpid dpid) {
393             if (adaptiveFlowSampling) {
394                 NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
395                 if (collector != null) {
396                     collector.stop();
397                 }
398             } else {
399                 FlowStatsCollector collector = simpleCollectors.remove(dpid);
400                 if (collector != null) {
401                     collector.stop();
402                 }
403             }
404             TableStatisticsCollector tsc = tableStatsCollectors.remove(dpid);
405             if (tsc != null) {
406                 tsc.stop();
407             }
408         }
409
410         @Override
411         public void switchChanged(Dpid dpid) {
412         }
413
414         @Override
415         public void portChanged(Dpid dpid, OFPortStatus status) {
416             // TODO: Decide whether to evict flows internal store.
417         }
418
419         @Override
420         public void handleMessage(Dpid dpid, OFMessage msg) {
421             OpenFlowSwitch sw = controller.getSwitch(dpid);
422             switch (msg.getType()) {
423                 case FLOW_REMOVED:
424                     OFFlowRemoved removed = (OFFlowRemoved) msg;
425
426                     FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
427                     providerService.flowRemoved(fr);
428
429                     if (adaptiveFlowSampling) {
430                         // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
431                         NewAdaptiveFlowStatsCollector collector = afsCollectors.get(dpid);
432                         if (collector != null) {
433                             collector.flowRemoved(fr);
434                         }
435                     }
436                     break;
437                 case STATS_REPLY:
438                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
439                         pushFlowMetrics(dpid, (OFFlowStatsReply) msg);
440                     } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
441                         pushTableStatistics(dpid, (OFTableStatsReply) msg);
442                     }
443                     break;
444                 case BARRIER_REPLY:
445                     try {
446                         InternalCacheEntry entry = pendingBatches.getIfPresent(msg.getXid());
447                         if (entry != null) {
448                             providerService
449                                     .batchOperationCompleted(msg.getXid(),
450                                                              entry.completed());
451                         } else {
452                             log.warn("Received unknown Barrier Reply: {}",
453                                      msg.getXid());
454                         }
455                     } finally {
456                         pendingBatches.invalidate(msg.getXid());
457                     }
458                     break;
459                 case ERROR:
460                     // TODO: This needs to get suppressed in a better way.
461                     if (msg instanceof OFBadRequestErrorMsg &&
462                             ((OFBadRequestErrorMsg) msg).getCode() == OFBadRequestCode.BAD_TYPE) {
463                         log.debug("Received error message {} from {}", msg, dpid);
464                     } else {
465                         log.warn("Received error message {} from {}", msg, dpid);
466                     }
467
468                     OFErrorMsg error = (OFErrorMsg) msg;
469                     if (error.getErrType() == OFErrorType.FLOW_MOD_FAILED) {
470                         OFFlowModFailedErrorMsg fmFailed = (OFFlowModFailedErrorMsg) error;
471                         if (fmFailed.getData().getParsedMessage().isPresent()) {
472                             OFMessage m = fmFailed.getData().getParsedMessage().get();
473                             OFFlowMod fm = (OFFlowMod) m;
474                             InternalCacheEntry entry =
475                                     pendingBatches.getIfPresent(msg.getXid());
476                             if (entry != null) {
477                                 entry.appendFailure(new FlowEntryBuilder(dpid, fm).build());
478                             } else {
479                                 log.error("No matching batch for this error: {}", error);
480                             }
481                         } else {
482                             // FIXME: Potentially add flowtracking to avoid this message.
483                             log.error("Flow installation failed but switch didn't"
484                                               + " tell us which one.");
485                         }
486                     }
487
488                 default:
489                     log.debug("Unhandled message type: {}", msg.getType());
490             }
491         }
492
493         @Override
494         public void receivedRoleReply(Dpid dpid, RoleState requested,
495                                       RoleState response) {
496             // Do nothing here for now.
497         }
498
499         private void pushFlowMetrics(Dpid dpid, OFFlowStatsReply replies) {
500
501             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
502
503             List<FlowEntry> flowEntries = replies.getEntries().stream()
504                     .map(entry -> new FlowEntryBuilder(dpid, entry).build())
505                     .collect(Collectors.toList());
506
507             if (adaptiveFlowSampling)  {
508                 NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
509
510                 synchronized (afsc) {
511                     if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
512                         log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
513                                         + "OFFlowStatsReply Xid={}, for {}",
514                                 afsc.getFlowMissingXid(), replies.getXid(), dpid);
515                     }
516
517                     // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
518                     if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
519                         if (afsc.getFlowMissingXid() == replies.getXid()) {
520                             // call entire flow stats update with flowMissing synchronization.
521                             // used existing pushFlowMetrics
522                             providerService.pushFlowMetrics(did, flowEntries);
523                         }
524                         // reset flowMissingXid to NO_FLOW_MISSING_XID
525                         afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
526
527                     } else {
528                         // call individual flow stats update
529                         providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
530                     }
531
532                     // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
533                     afsc.pushFlowMetrics(flowEntries);
534                 }
535             } else {
536                 // call existing entire flow stats update with flowMissing synchronization
537                 providerService.pushFlowMetrics(did, flowEntries);
538             }
539         }
540
541         private void pushTableStatistics(Dpid dpid, OFTableStatsReply replies) {
542
543             DeviceId did = DeviceId.deviceId(Dpid.uri(dpid));
544             List<TableStatisticsEntry> tableStatsEntries = replies.getEntries().stream()
545                     .map(entry -> buildTableStatistics(did, entry))
546                     .filter(Objects::nonNull)
547                     .collect(Collectors.toList());
548             providerService.pushTableStatistics(did, tableStatsEntries);
549         }
550
551         private TableStatisticsEntry buildTableStatistics(DeviceId deviceId,
552                                                           OFTableStatsEntry ofEntry) {
553             TableStatisticsEntry entry = null;
554             if (ofEntry != null) {
555                 entry = new DefaultTableStatisticsEntry(deviceId,
556                                                         ofEntry.getTableId().getValue(),
557                                                         ofEntry.getActiveCount(),
558                                                         ofEntry.getLookupCount().getValue(),
559                                                         ofEntry.getMatchedCount().getValue());
560             }
561
562             return entry;
563
564         }
565     }
566
567     /**
568      * The internal cache entry holding the original request as well as
569      * accumulating the any failures along the way.
570      * <p/>
571      * If this entry is evicted from the cache then the entire operation is
572      * considered failed. Otherwise, only the failures reported by the device
573      * will be propagated up.
574      */
575     private class InternalCacheEntry {
576
577         private final FlowRuleBatchOperation operation;
578         private final Set<FlowRule> failures = Sets.newConcurrentHashSet();
579
580         public InternalCacheEntry(FlowRuleBatchOperation operation) {
581             this.operation = operation;
582         }
583
584         /**
585          * Appends a failed rule to the set of failed items.
586          *
587          * @param rule the failed rule
588          */
589         public void appendFailure(FlowRule rule) {
590             failures.add(rule);
591         }
592
593         /**
594          * Fails the entire batch and returns the failed operation.
595          *
596          * @return the failed operation
597          */
598         public CompletedBatchOperation failedCompletion() {
599             Set<FlowRule> fails = operation.getOperations().stream()
600                     .map(op -> op.target()).collect(Collectors.toSet());
601             return new CompletedBatchOperation(false,
602                                                Collections
603                                                        .unmodifiableSet(fails),
604                                                operation.deviceId());
605         }
606
607         /**
608          * Returns the completed operation and whether the batch suceeded.
609          *
610          * @return the completed operation
611          */
612         public CompletedBatchOperation completed() {
613             return new CompletedBatchOperation(
614                     failures.isEmpty(),
615                     Collections
616                             .unmodifiableSet(failures),
617                     operation.deviceId());
618         }
619     }
620
621 }