aa8df947d517e38aa98178a3efb1a1251206e105
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.provider.of.flow.impl;
18
19 import com.google.common.base.Objects;
20 import com.google.common.collect.ImmutableSet;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import org.onosproject.net.flow.DefaultTypedFlowEntry;
24 import org.onosproject.net.flow.FlowEntry;
25 import org.onosproject.net.flow.FlowId;
26 import org.onosproject.net.flow.FlowRule;
27 import org.onosproject.net.flow.StoredFlowEntry;
28 import org.onosproject.net.flow.TypedStoredFlowEntry;
29 import org.onosproject.net.flow.instructions.Instruction;
30 import org.onosproject.net.flow.instructions.Instructions;
31 import org.onosproject.openflow.controller.OpenFlowSwitch;
32 import org.onosproject.openflow.controller.RoleState;
33 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
34 import org.projectfloodlight.openflow.protocol.match.Match;
35 import org.projectfloodlight.openflow.types.OFPort;
36 import org.projectfloodlight.openflow.types.TableId;
37 import org.slf4j.Logger;
38
39 import java.util.HashSet;
40 import java.util.List;
41 import java.util.Map;
42 import java.util.Optional;
43 import java.util.Set;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.ScheduledFuture;
47 import java.util.concurrent.TimeUnit;
48
49 import static com.google.common.base.Preconditions.checkNotNull;
50 import static org.onlab.util.Tools.groupedThreads;
51 import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
52 import static org.slf4j.LoggerFactory.getLogger;
53
54 /**
55  * Efficiently and adaptively collects flow statistics for the specified switch.
56  */
57 public class NewAdaptiveFlowStatsCollector {
58
59     private final Logger log = getLogger(getClass());
60
61     private final OpenFlowSwitch sw;
62
63     private ScheduledExecutorService adaptiveFlowStatsScheduler =
64             Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
65     private ScheduledFuture<?> calAndShortFlowsThread;
66     private ScheduledFuture<?> midFlowsThread;
67     private ScheduledFuture<?> longFlowsThread;
68
69     // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
70     private CalAndShortFlowsTask calAndShortFlowsTask;
71     // Task that collects stats MID flows every 2*calAndPollInterval
72     private MidFlowsTask midFlowsTask;
73     // Task that collects stats LONG flows every 3*calAndPollInterval
74     private LongFlowsTask longFlowsTask;
75
76     private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
77     private static final int MID_POLL_TIMES = 2;     // variable greater or equal than 1
78     private static final int LONG_POLL_TIMES = 3;    // variable greater or equal than MID_POLL_TIMES
79     //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
80     // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
81     private static final int ENTIRE_POLL_TIMES = 6;
82
83     private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
84     private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
85     private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
86
87     private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
88     private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89     private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90     // only used for checking condition at each task if it collects entire flows from a given switch or not
91     private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
92
93     // Number of call count of each Task,
94     // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
95     private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
96     private int callCountMidFlowsTask = 0;   // increased MID_POLL_TIMES whenever Task is called
97     private int callCountLongFlowsTask = 0;  // increased LONG_POLL_TIMES whenever Task is called
98
99     private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
100
101     private boolean isFirstTimeStart = true;
102
103     public static final long NO_FLOW_MISSING_XID = (-1);
104     private long flowMissingXid = NO_FLOW_MISSING_XID;
105
106     /**
107      * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
108      *
109      * @param sw           switch to pull
110      * @param pollInterval cal and immediate poll frequency in seconds
111      */
112     NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
113         this.sw = sw;
114
115         initMemberVars(pollInterval);
116     }
117
118     // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
119     private void initMemberVars(int pollInterval) {
120         if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
121             this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
122         } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
123             this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
124         } else {
125             this.calAndPollInterval = pollInterval;
126         }
127
128         calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
129         midPollInterval = MID_POLL_TIMES * calAndPollInterval;
130         longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
131         entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
132
133         callCountCalAndShortFlowsTask = 0;
134         callCountMidFlowsTask = 0;
135         callCountLongFlowsTask = 0;
136
137         flowMissingXid = NO_FLOW_MISSING_XID;
138     }
139
140     /**
141      * Adjusts adaptive poll frequency.
142      *
143      * @param pollInterval poll frequency in seconds
144      */
145     synchronized void adjustCalAndPollInterval(int pollInterval) {
146         initMemberVars(pollInterval);
147
148         if (calAndShortFlowsThread != null) {
149             calAndShortFlowsThread.cancel(false);
150         }
151         if (midFlowsThread != null) {
152             midFlowsThread.cancel(false);
153         }
154         if (longFlowsThread != null) {
155             longFlowsThread.cancel(false);
156         }
157
158         calAndShortFlowsTask = new CalAndShortFlowsTask();
159         calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
160                 calAndShortFlowsTask,
161                 0,
162                 calAndPollInterval,
163                 TimeUnit.SECONDS);
164
165         midFlowsTask = new MidFlowsTask();
166         midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
167                 midFlowsTask,
168                 0,
169                 midPollInterval,
170                 TimeUnit.SECONDS);
171
172         longFlowsTask = new LongFlowsTask();
173         longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
174                 longFlowsTask,
175                 0,
176                 longPollInterval,
177                 TimeUnit.SECONDS);
178
179         log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
180     }
181
182     private class CalAndShortFlowsTask implements Runnable {
183         @Override
184         public void run() {
185             if (sw.getRole() == RoleState.MASTER) {
186                 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
187
188                 if (isFirstTimeStart) {
189                     // isFirstTimeStart, get entire flow stats from a given switch sw
190                     log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
191                             sw.getStringId());
192                     ofFlowStatsRequestAllSend();
193
194                     callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
195                     isFirstTimeStart = false;
196                 } else  if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
197                     // entire_poll_times, get entire flow stats from a given switch sw
198                     log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
199                     ofFlowStatsRequestAllSend();
200
201                     callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
202                     //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
203                     //
204                 } else {
205                     calAndShortFlowsTaskInternal();
206                     callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
207                 }
208             }
209         }
210     }
211
212     // send openflow flow stats request message with getting all flow entries to a given switch sw
213     private void ofFlowStatsRequestAllSend() {
214         OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
215                 .setMatch(sw.factory().matchWildcardAll())
216                 .setTableId(TableId.ALL)
217                 .setOutPort(OFPort.NO_MASK)
218                 .build();
219
220         synchronized (this) {
221             // set the request xid to check the reply in OpenFlowRuleProvider
222             // After processing the reply of this request message,
223             // this must be set to NO_FLOW_MISSING_XID(-1) by provider
224             setFlowMissingXid(request.getXid());
225             log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
226
227             sw.sendMsg(request);
228         }
229     }
230
231     // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
232     private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
233         // set find match
234         Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
235                 Optional.empty()).buildMatch();
236         // set find tableId
237         TableId tableId = TableId.of(fe.tableId());
238         // set output port
239         Instruction ins = fe.treatment().allInstructions().stream()
240                 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
241                 .findFirst()
242                 .orElse(null);
243         OFPort ofPort = OFPort.NO_MASK;
244         if (ins != null) {
245             Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246             ofPort = OFPort.of((int) ((out.port().toLong())));
247         }
248
249         OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
250                 .setMatch(match)
251                 .setTableId(tableId)
252                 .setOutPort(ofPort)
253                 .build();
254
255         synchronized (this) {
256             if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257                 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258                                 + " set no flow missing xid anyway, for {}",
259                         sw.getStringId());
260                 setFlowMissingXid(NO_FLOW_MISSING_XID);
261             }
262
263             sw.sendMsg(request);
264         }
265     }
266
267     private void calAndShortFlowsTaskInternal() {
268         deviceFlowTable.checkAndMoveLiveFlowAll();
269
270         deviceFlowTable.getShortFlows().forEach(fe -> {
271             ofFlowStatsRequestFlowSend(fe);
272         });
273     }
274
275     private class MidFlowsTask implements Runnable {
276         @Override
277         public void run() {
278             if (sw.getRole() == RoleState.MASTER) {
279                 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
280
281                 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282                 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283                     callCountMidFlowsTask = MID_POLL_TIMES;
284                 } else {
285                     midFlowsTaskInternal();
286                     callCountMidFlowsTask += MID_POLL_TIMES;
287                 }
288             }
289         }
290     }
291
292     private void midFlowsTaskInternal() {
293         deviceFlowTable.getMidFlows().forEach(fe -> {
294             ofFlowStatsRequestFlowSend(fe);
295         });
296     }
297
298     private class LongFlowsTask implements Runnable {
299         @Override
300         public void run() {
301             if (sw.getRole() == RoleState.MASTER) {
302                 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
303
304                 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305                 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306                     callCountLongFlowsTask = LONG_POLL_TIMES;
307                 } else {
308                     longFlowsTaskInternal();
309                     callCountLongFlowsTask += LONG_POLL_TIMES;
310                 }
311             }
312         }
313     }
314
315     private void longFlowsTaskInternal() {
316         deviceFlowTable.getLongFlows().forEach(fe -> {
317             ofFlowStatsRequestFlowSend(fe);
318         });
319     }
320
321     /**
322      * start adaptive flow statistic collection.
323      *
324      */
325     public synchronized void start() {
326         log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327         callCountCalAndShortFlowsTask = 0;
328         callCountMidFlowsTask = 0;
329         callCountLongFlowsTask = 0;
330
331         isFirstTimeStart = true;
332
333         // Initially start polling quickly. Then drop down to configured value
334         calAndShortFlowsTask = new CalAndShortFlowsTask();
335         calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336                 calAndShortFlowsTask,
337                 1,
338                 calAndPollInterval,
339                 TimeUnit.SECONDS);
340
341         midFlowsTask = new MidFlowsTask();
342         midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
343                 midFlowsTask,
344                 1,
345                 midPollInterval,
346                 TimeUnit.SECONDS);
347
348         longFlowsTask = new LongFlowsTask();
349         longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
350                 longFlowsTask,
351                 1,
352                 longPollInterval,
353                 TimeUnit.SECONDS);
354
355         log.info("Started");
356     }
357
358     /**
359      * stop adaptive flow statistic collection.
360      *
361      */
362     public synchronized void stop() {
363         log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364         if (calAndShortFlowsThread != null) {
365             calAndShortFlowsThread.cancel(true);
366         }
367         if (midFlowsThread != null) {
368             midFlowsThread.cancel(true);
369         }
370         if (longFlowsThread != null) {
371             longFlowsThread.cancel(true);
372         }
373
374         adaptiveFlowStatsScheduler.shutdownNow();
375
376         isFirstTimeStart = false;
377
378         log.info("Stopped");
379     }
380
381     /**
382      * add typed flow entry from flow rule into the internal flow table.
383      *
384      * @param flowRules the flow rules
385      *
386      */
387     public synchronized void addWithFlowRule(FlowRule... flowRules) {
388         for (FlowRule fr : flowRules) {
389             // First remove old entry unconditionally, if exist
390             deviceFlowTable.remove(fr);
391
392             // add new flow entry, we suppose IMMEDIATE_FLOW
393             TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394                     FlowLiveType.IMMEDIATE_FLOW);
395             deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
396         }
397     }
398
399     /**
400      * add or update typed flow entry from flow entry into the internal flow table.
401      *
402      * @param flowEntries the flow entries
403      *
404      */
405     public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406        for (FlowEntry fe : flowEntries) {
407            // check if this new rule is an update to an existing entry
408            TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
409
410            if (stored != null) {
411                // duplicated flow entry is collected!, just skip
412                if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413                        && fe.life() == stored.life()) {
414                    log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415                                    + ",is DUPLICATED stats collection, just skip."
416                                    + " AdaptiveStats collection thread for {}",
417                            sw.getStringId());
418
419                    stored.setLastSeen();
420                    continue;
421                } else if (fe.life() < stored.life()) {
422                    // Invalid updates the stats values, i.e., bytes, packets, durations ...
423                    log.debug("addOrUpdateFlows():" +
424                                " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425                                " new flowId=" + Long.toHexString(fe.id().value()) +
426                                ", old flowId=" + Long.toHexString(stored.id().value()) +
427                                ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428                                ", new life=" + fe.life() + ", old life=" + stored.life() +
429                                ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
430                    // go next
431                    stored.setLastSeen();
432                    continue;
433                }
434
435                // update now
436                stored.setLife(fe.life());
437                stored.setPackets(fe.packets());
438                stored.setBytes(fe.bytes());
439                stored.setLastSeen();
440                if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441                    // flow is really RULE_ADDED
442                    stored.setState(FlowEntry.FlowEntryState.ADDED);
443                }
444                // flow is RULE_UPDATED, skip adding and just updating flow live table
445                //deviceFlowTable.calAndSetFlowLiveType(stored);
446                continue;
447            }
448
449            // add new flow entry, we suppose IMMEDIATE_FLOW
450            TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451                     FlowLiveType.IMMEDIATE_FLOW);
452            deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
453         }
454     }
455
456     /**
457      * remove typed flow entry from the internal flow table.
458      *
459      * @param flowRules the flow entries
460      *
461      */
462     public synchronized void removeFlows(FlowRule...  flowRules) {
463         for (FlowRule rule : flowRules) {
464             deviceFlowTable.remove(rule);
465         }
466     }
467
468     // same as removeFlows() function
469     /**
470      * remove typed flow entry from the internal flow table.
471      *
472      * @param flowRules the flow entries
473      *
474      */
475     public void flowRemoved(FlowRule... flowRules) {
476         removeFlows(flowRules);
477     }
478
479     // same as addOrUpdateFlows() function
480     /**
481      * add or update typed flow entry from flow entry into the internal flow table.
482      *
483      * @param flowEntries the flow entry list
484      *
485      */
486     public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487         flowEntries.forEach(fe -> {
488             addOrUpdateFlows(fe);
489         });
490     }
491
492     /**
493      * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
494      *
495      * @return xid of missing flow
496      */
497     public long getFlowMissingXid() {
498         return flowMissingXid;
499     }
500
501     /**
502      * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
503      *
504      * @param flowMissingXid the OFFlowStatsRequest message Id
505      *
506      */
507     public void setFlowMissingXid(long flowMissingXid) {
508         this.flowMissingXid = flowMissingXid;
509     }
510
511     private class InternalDeviceFlowTable {
512
513         private final Map<FlowId, Set<TypedStoredFlowEntry>>
514                 flowEntries = Maps.newConcurrentMap();
515
516         private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
517         private final Set<StoredFlowEntry> midFlows = new HashSet<>();
518         private final Set<StoredFlowEntry> longFlows = new HashSet<>();
519
520         // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
521         private final long latencyFlowStatsRequestAndReplyMillis = 500;
522
523
524         // Statistics for table operation
525         private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
526         private long removeCount = 0;
527
528         /**
529          * Resets all count values with zero.
530          *
531          */
532         public void resetAllCount() {
533             addCount = 0;
534             addWithSetFlowLiveTypeCount = 0;
535             removeCount = 0;
536         }
537
538         // get set of flow entries for the given flowId
539         private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
540             return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
541         }
542
543         // get flow entry for the given flow rule
544         private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
545             Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
546             return flowEntries.stream()
547                     .filter(entry -> Objects.equal(entry, rule))
548                     .findAny()
549                     .orElse(null);
550         }
551
552         // get the flow entries for all flows in flow table
553         private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
554             Set<TypedStoredFlowEntry> result = Sets.newHashSet();
555
556             flowEntries.values().forEach(result::addAll);
557             return result;
558         }
559
560         /**
561          * Gets the number of flow entry in flow table.
562          *
563          * @return the number of flow entry.
564          *
565          */
566         public long getFlowCount() {
567             return flowEntries.values().stream().mapToLong(Set::size).sum();
568         }
569
570         /**
571          * Gets the number of flow entry in flow table.
572          *
573          * @param rule the flow rule
574          * @return the typed flow entry.
575          *
576          */
577         public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
578             checkNotNull(rule);
579
580             return getFlowEntryInternal(rule);
581         }
582
583         /**
584          * Gets the all typed flow entries in flow table.
585          *
586          * @return the set of typed flow entry.
587          *
588          */
589         public Set<TypedStoredFlowEntry> getFlowEntries() {
590             return getFlowEntriesInternal();
591         }
592
593         /**
594          * Gets the short typed flow entries in flow table.
595          *
596          * @return the set of typed flow entry.
597          *
598          */
599         public Set<StoredFlowEntry> getShortFlows() {
600             return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
601         }
602
603         /**
604          * Gets the mid typed flow entries in flow table.
605          *
606          * @return the set of typed flow entry.
607          *
608          */
609         public Set<StoredFlowEntry> getMidFlows() {
610             return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
611         }
612
613         /**
614          * Gets the long typed flow entries in flow table.
615          *
616          * @return the set of typed flow entry.
617          *
618          */
619         public Set<StoredFlowEntry> getLongFlows() {
620             return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
621         }
622
623         /**
624          * Add typed flow entry into table only.
625          *
626          * @param rule the flow rule
627          *
628          */
629         public synchronized void add(TypedStoredFlowEntry rule) {
630             checkNotNull(rule);
631
632             //rule have to be new DefaultTypedFlowEntry
633             boolean result = getFlowEntriesInternal(rule.id()).add(rule);
634
635             if (result) {
636                 addCount++;
637             }
638         }
639
640         /**
641          * Calculates and set the flow live type at the first time,
642          * and then add it into a corresponding typed flow table.
643          *
644          * @param rule the flow rule
645          *
646          */
647         public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
648             checkNotNull(rule);
649
650             calAndSetFlowLiveTypeInternal(rule);
651         }
652
653         /**
654          * Add the typed flow entry into table, and calculates and set the flow live type,
655          * and then add it into a corresponding typed flow table.
656          *
657          * @param rule the flow rule
658          *
659          */
660        public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
661             checkNotNull(rule);
662
663             //rule have to be new DefaultTypedFlowEntry
664             boolean result = getFlowEntriesInternal(rule.id()).add(rule);
665             if (result) {
666                 calAndSetFlowLiveTypeInternal(rule);
667                 addWithSetFlowLiveTypeCount++;
668             } else {
669                 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
670                                 + " ADD Failed, cause it may already exists in table !!!,"
671                                 + " AdaptiveStats collection thread for {}",
672                         sw.getStringId());
673             }
674         }
675
676         // In real, calculates and set the flow live type at the first time,
677         // and then add it into a corresponding typed flow table
678         private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
679             long life = rule.life();
680             FlowLiveType prevFlowLiveType = rule.flowLiveType();
681
682             if (life >= longPollInterval) {
683                 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
684                 longFlows.add(rule);
685             } else if (life >= midPollInterval) {
686                 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
687                 midFlows.add(rule);
688             } else if (life >= calAndPollInterval) {
689                 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
690                 shortFlows.add(rule);
691             } else if (life >= 0) {
692                 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
693             } else { // life < 0
694                 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
695             }
696
697             if (rule.flowLiveType() != prevFlowLiveType) {
698                 switch (prevFlowLiveType) {
699                     // delete it from previous flow table
700                     case SHORT_FLOW:
701                         shortFlows.remove(rule);
702                         break;
703                     case MID_FLOW:
704                         midFlows.remove(rule);
705                         break;
706                     case LONG_FLOW:
707                         longFlows.remove(rule);
708                         break;
709                     default:
710                         break;
711                 }
712             }
713         }
714
715
716         // check the flow live type based on current time, then set and add it into corresponding table
717         private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
718             long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
719             // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
720             long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
721             // fe.life() unit is SECOND!
722             long liveTime = fe.life() + fromLastSeen;
723
724
725             switch (fe.flowLiveType()) {
726                 case IMMEDIATE_FLOW:
727                     if (liveTime >= longPollInterval) {
728                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
729                          longFlows.add(fe);
730                     } else if (liveTime >= midPollInterval) {
731                         fe.setFlowLiveType(FlowLiveType.MID_FLOW);
732                         midFlows.add(fe);
733                     } else if (liveTime >= calAndPollInterval) {
734                         fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
735                         shortFlows.add(fe);
736                     }
737                     break;
738                 case SHORT_FLOW:
739                     if (liveTime >= longPollInterval) {
740                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
741                         shortFlows.remove(fe);
742                         longFlows.add(fe);
743                     } else if (liveTime >= midPollInterval) {
744                         fe.setFlowLiveType(FlowLiveType.MID_FLOW);
745                         shortFlows.remove(fe);
746                         midFlows.add(fe);
747                     }
748                     break;
749                 case MID_FLOW:
750                     if (liveTime >= longPollInterval) {
751                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
752                         midFlows.remove(fe);
753                         longFlows.add(fe);
754                     }
755                     break;
756                 case LONG_FLOW:
757                     if (fromLastSeen > entirePollInterval) {
758                         log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
759                         return false;
760                     }
761                     break;
762                 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
763                 default :
764                     // Error Unknown Live Type
765                     log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
766                             + "AdaptiveStats collection thread for {}",
767                             sw.getStringId());
768                     return false;
769             }
770
771             log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
772                             + ", state=" + fe.state()
773                             + ", After liveType=" + fe.flowLiveType()
774                             + ", liveTime=" + liveTime
775                             + ", life=" + fe.life()
776                             + ", bytes=" + fe.bytes()
777                             + ", packets=" + fe.packets()
778                             + ", fromLastSeen=" + fromLastSeen
779                             + ", priority=" + fe.priority()
780                             + ", selector=" + fe.selector().criteria()
781                             + ", treatment=" + fe.treatment()
782                             + " AdaptiveStats collection thread for {}",
783                     sw.getStringId());
784
785             return true;
786         }
787
788         /**
789          * Check and move live type for all type flow entries in table at every calAndPollInterval time.
790          *
791          */
792         public void checkAndMoveLiveFlowAll() {
793             Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
794
795             long calCurTime = System.currentTimeMillis();
796             typedFlowEntries.forEach(fe -> {
797                 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
798                     remove(fe);
799                 }
800             });
801
802             // print table counts for debug
803             if (log.isDebugEnabled()) {
804                 synchronized (this) {
805                     long totalFlowCount = getFlowCount();
806                     long shortFlowCount = shortFlows.size();
807                     long midFlowCount = midFlows.size();
808                     long longFlowCount = longFlows.size();
809                     long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
810                     long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
811
812                     log.debug("--------------------------------------------------------------------------- for {}",
813                             sw.getStringId());
814                     log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
815                             + ", add - remove_Count=" + calTotalCount
816                             + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
817                             + ", SHORT_FLOW_Count=" + shortFlowCount
818                             + ", MID_FLOW_Count=" + midFlowCount
819                             + ", LONG_FLOW_Count=" + longFlowCount
820                             + ", add_Count=" + addCount
821                             + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
822                             + ", remove_Count=" + removeCount
823                             + " AdaptiveStats collection thread for {}", sw.getStringId());
824                     log.debug("--------------------------------------------------------------------------- for {}",
825                             sw.getStringId());
826                     if (totalFlowCount != calTotalCount) {
827                         log.error("checkAndMoveLiveFlowAll, Real total flow count and "
828                                 + "calculated total flow count do NOT match, something is wrong internally "
829                                 + "or check counter value bound is over!");
830                     }
831                     if (immediateFlowCount < 0) {
832                         log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
833                                 + "something is wrong internally "
834                                 + "or check counter value bound is over!");
835                     }
836                 }
837             }
838             log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
839         }
840
841         /**
842          * Remove the typed flow entry from table.
843          *
844          * @param rule the flow rule
845          *
846          */
847         public synchronized void remove(FlowRule rule) {
848             checkNotNull(rule);
849
850             TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
851             if (removeStore != null) {
852                 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
853                 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
854
855                 if (result) {
856                     removeCount++;
857                 }
858             }
859        }
860
861         // Remove the typed flow entry from corresponding table
862         private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
863             switch (fe.flowLiveType()) {
864                 case IMMEDIATE_FLOW:
865                     // do nothing
866                     break;
867                 case SHORT_FLOW:
868                     shortFlows.remove(fe);
869                     break;
870                 case MID_FLOW:
871                     midFlows.remove(fe);
872                     break;
873                 case LONG_FLOW:
874                     longFlows.remove(fe);
875                     break;
876                 default: // error in Flow Live Type
877                     log.error("removeLiveFlowsInternal, Unknown Live Type error!");
878                     break;
879             }
880         }
881     }
882 }