2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package org.onosproject.provider.of.flow.impl;
19 import com.google.common.base.Objects;
20 import com.google.common.collect.ImmutableSet;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23 import org.onosproject.net.flow.DefaultTypedFlowEntry;
24 import org.onosproject.net.flow.FlowEntry;
25 import org.onosproject.net.flow.FlowId;
26 import org.onosproject.net.flow.FlowRule;
27 import org.onosproject.net.flow.StoredFlowEntry;
28 import org.onosproject.net.flow.TypedStoredFlowEntry;
29 import org.onosproject.net.flow.instructions.Instruction;
30 import org.onosproject.net.flow.instructions.Instructions;
31 import org.onosproject.openflow.controller.OpenFlowSwitch;
32 import org.onosproject.openflow.controller.RoleState;
33 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
34 import org.projectfloodlight.openflow.protocol.match.Match;
35 import org.projectfloodlight.openflow.types.OFPort;
36 import org.projectfloodlight.openflow.types.TableId;
37 import org.slf4j.Logger;
39 import java.util.HashSet;
40 import java.util.List;
42 import java.util.Optional;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.ScheduledExecutorService;
46 import java.util.concurrent.ScheduledFuture;
47 import java.util.concurrent.TimeUnit;
49 import static com.google.common.base.Preconditions.checkNotNull;
50 import static org.onlab.util.Tools.groupedThreads;
51 import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
52 import static org.slf4j.LoggerFactory.getLogger;
55 * Efficiently and adaptively collects flow statistics for the specified switch.
57 public class NewAdaptiveFlowStatsCollector {
59 private final Logger log = getLogger(getClass());
61 private final OpenFlowSwitch sw;
63 private ScheduledExecutorService adaptiveFlowStatsScheduler =
64 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
65 private ScheduledFuture<?> calAndShortFlowsThread;
66 private ScheduledFuture<?> midFlowsThread;
67 private ScheduledFuture<?> longFlowsThread;
69 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
70 private CalAndShortFlowsTask calAndShortFlowsTask;
71 // Task that collects stats MID flows every 2*calAndPollInterval
72 private MidFlowsTask midFlowsTask;
73 // Task that collects stats LONG flows every 3*calAndPollInterval
74 private LongFlowsTask longFlowsTask;
76 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
77 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
78 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
79 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
80 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
81 private static final int ENTIRE_POLL_TIMES = 6;
83 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
84 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
85 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
87 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
88 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
89 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
90 // only used for checking condition at each task if it collects entire flows from a given switch or not
91 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
93 // Number of call count of each Task,
94 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
95 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
96 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
97 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
99 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
101 private boolean isFirstTimeStart = true;
103 public static final long NO_FLOW_MISSING_XID = (-1);
104 private long flowMissingXid = NO_FLOW_MISSING_XID;
107 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
109 * @param sw switch to pull
110 * @param pollInterval cal and immediate poll frequency in seconds
112 NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
115 initMemberVars(pollInterval);
118 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
119 private void initMemberVars(int pollInterval) {
120 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
121 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
122 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
123 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
125 this.calAndPollInterval = pollInterval;
128 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
129 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
130 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
131 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
133 callCountCalAndShortFlowsTask = 0;
134 callCountMidFlowsTask = 0;
135 callCountLongFlowsTask = 0;
137 flowMissingXid = NO_FLOW_MISSING_XID;
141 * Adjusts adaptive poll frequency.
143 * @param pollInterval poll frequency in seconds
145 synchronized void adjustCalAndPollInterval(int pollInterval) {
146 initMemberVars(pollInterval);
148 if (calAndShortFlowsThread != null) {
149 calAndShortFlowsThread.cancel(false);
151 if (midFlowsThread != null) {
152 midFlowsThread.cancel(false);
154 if (longFlowsThread != null) {
155 longFlowsThread.cancel(false);
158 calAndShortFlowsTask = new CalAndShortFlowsTask();
159 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
160 calAndShortFlowsTask,
165 midFlowsTask = new MidFlowsTask();
166 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
172 longFlowsTask = new LongFlowsTask();
173 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
179 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
182 private class CalAndShortFlowsTask implements Runnable {
185 if (sw.getRole() == RoleState.MASTER) {
186 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
188 if (isFirstTimeStart) {
189 // isFirstTimeStart, get entire flow stats from a given switch sw
190 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
192 ofFlowStatsRequestAllSend();
194 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
195 isFirstTimeStart = false;
196 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
197 // entire_poll_times, get entire flow stats from a given switch sw
198 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
199 ofFlowStatsRequestAllSend();
201 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
202 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
205 calAndShortFlowsTaskInternal();
206 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
212 // send openflow flow stats request message with getting all flow entries to a given switch sw
213 private void ofFlowStatsRequestAllSend() {
214 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
215 .setMatch(sw.factory().matchWildcardAll())
216 .setTableId(TableId.ALL)
217 .setOutPort(OFPort.NO_MASK)
220 synchronized (this) {
221 // set the request xid to check the reply in OpenFlowRuleProvider
222 // After processing the reply of this request message,
223 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
224 setFlowMissingXid(request.getXid());
225 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
231 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
232 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
234 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
235 Optional.empty()).buildMatch();
237 TableId tableId = TableId.of(fe.tableId());
239 Instruction ins = fe.treatment().allInstructions().stream()
240 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
243 OFPort ofPort = OFPort.NO_MASK;
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
246 ofPort = OFPort.of((int) ((out.port().toLong())));
249 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
255 synchronized (this) {
256 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
257 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
258 + " set no flow missing xid anyway, for {}",
260 setFlowMissingXid(NO_FLOW_MISSING_XID);
267 private void calAndShortFlowsTaskInternal() {
268 deviceFlowTable.checkAndMoveLiveFlowAll();
270 deviceFlowTable.getShortFlows().forEach(fe -> {
271 ofFlowStatsRequestFlowSend(fe);
275 private class MidFlowsTask implements Runnable {
278 if (sw.getRole() == RoleState.MASTER) {
279 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
281 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
282 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
283 callCountMidFlowsTask = MID_POLL_TIMES;
285 midFlowsTaskInternal();
286 callCountMidFlowsTask += MID_POLL_TIMES;
292 private void midFlowsTaskInternal() {
293 deviceFlowTable.getMidFlows().forEach(fe -> {
294 ofFlowStatsRequestFlowSend(fe);
298 private class LongFlowsTask implements Runnable {
301 if (sw.getRole() == RoleState.MASTER) {
302 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
304 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
305 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
306 callCountLongFlowsTask = LONG_POLL_TIMES;
308 longFlowsTaskInternal();
309 callCountLongFlowsTask += LONG_POLL_TIMES;
315 private void longFlowsTaskInternal() {
316 deviceFlowTable.getLongFlows().forEach(fe -> {
317 ofFlowStatsRequestFlowSend(fe);
322 * start adaptive flow statistic collection.
325 public synchronized void start() {
326 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
327 callCountCalAndShortFlowsTask = 0;
328 callCountMidFlowsTask = 0;
329 callCountLongFlowsTask = 0;
331 isFirstTimeStart = true;
333 // Initially start polling quickly. Then drop down to configured value
334 calAndShortFlowsTask = new CalAndShortFlowsTask();
335 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
336 calAndShortFlowsTask,
341 midFlowsTask = new MidFlowsTask();
342 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
348 longFlowsTask = new LongFlowsTask();
349 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
359 * stop adaptive flow statistic collection.
362 public synchronized void stop() {
363 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
364 if (calAndShortFlowsThread != null) {
365 calAndShortFlowsThread.cancel(true);
367 if (midFlowsThread != null) {
368 midFlowsThread.cancel(true);
370 if (longFlowsThread != null) {
371 longFlowsThread.cancel(true);
374 adaptiveFlowStatsScheduler.shutdownNow();
376 isFirstTimeStart = false;
382 * add typed flow entry from flow rule into the internal flow table.
384 * @param flowRules the flow rules
387 public synchronized void addWithFlowRule(FlowRule... flowRules) {
388 for (FlowRule fr : flowRules) {
389 // First remove old entry unconditionally, if exist
390 deviceFlowTable.remove(fr);
392 // add new flow entry, we suppose IMMEDIATE_FLOW
393 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
394 FlowLiveType.IMMEDIATE_FLOW);
395 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
400 * add or update typed flow entry from flow entry into the internal flow table.
402 * @param flowEntries the flow entries
405 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
406 for (FlowEntry fe : flowEntries) {
407 // check if this new rule is an update to an existing entry
408 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
410 if (stored != null) {
411 // duplicated flow entry is collected!, just skip
412 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
413 && fe.life() == stored.life()) {
414 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
415 + ",is DUPLICATED stats collection, just skip."
416 + " AdaptiveStats collection thread for {}",
419 stored.setLastSeen();
421 } else if (fe.life() < stored.life()) {
422 // Invalid updates the stats values, i.e., bytes, packets, durations ...
423 log.debug("addOrUpdateFlows():" +
424 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
425 " new flowId=" + Long.toHexString(fe.id().value()) +
426 ", old flowId=" + Long.toHexString(stored.id().value()) +
427 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
428 ", new life=" + fe.life() + ", old life=" + stored.life() +
429 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
431 stored.setLastSeen();
436 stored.setLife(fe.life());
437 stored.setPackets(fe.packets());
438 stored.setBytes(fe.bytes());
439 stored.setLastSeen();
440 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
441 // flow is really RULE_ADDED
442 stored.setState(FlowEntry.FlowEntryState.ADDED);
444 // flow is RULE_UPDATED, skip adding and just updating flow live table
445 //deviceFlowTable.calAndSetFlowLiveType(stored);
449 // add new flow entry, we suppose IMMEDIATE_FLOW
450 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
451 FlowLiveType.IMMEDIATE_FLOW);
452 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
457 * remove typed flow entry from the internal flow table.
459 * @param flowRules the flow entries
462 public synchronized void removeFlows(FlowRule... flowRules) {
463 for (FlowRule rule : flowRules) {
464 deviceFlowTable.remove(rule);
468 // same as removeFlows() function
470 * remove typed flow entry from the internal flow table.
472 * @param flowRules the flow entries
475 public void flowRemoved(FlowRule... flowRules) {
476 removeFlows(flowRules);
479 // same as addOrUpdateFlows() function
481 * add or update typed flow entry from flow entry into the internal flow table.
483 * @param flowEntries the flow entry list
486 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
487 flowEntries.forEach(fe -> {
488 addOrUpdateFlows(fe);
493 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
495 * @return xid of missing flow
497 public long getFlowMissingXid() {
498 return flowMissingXid;
502 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
504 * @param flowMissingXid the OFFlowStatsRequest message Id
507 public void setFlowMissingXid(long flowMissingXid) {
508 this.flowMissingXid = flowMissingXid;
511 private class InternalDeviceFlowTable {
513 private final Map<FlowId, Set<TypedStoredFlowEntry>>
514 flowEntries = Maps.newConcurrentMap();
516 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
517 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
518 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
520 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
521 private final long latencyFlowStatsRequestAndReplyMillis = 500;
524 // Statistics for table operation
525 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
526 private long removeCount = 0;
529 * Resets all count values with zero.
532 public void resetAllCount() {
534 addWithSetFlowLiveTypeCount = 0;
538 // get set of flow entries for the given flowId
539 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
540 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
543 // get flow entry for the given flow rule
544 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
545 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
546 return flowEntries.stream()
547 .filter(entry -> Objects.equal(entry, rule))
552 // get the flow entries for all flows in flow table
553 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
554 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
556 flowEntries.values().forEach(result::addAll);
561 * Gets the number of flow entry in flow table.
563 * @return the number of flow entry.
566 public long getFlowCount() {
567 return flowEntries.values().stream().mapToLong(Set::size).sum();
571 * Gets the number of flow entry in flow table.
573 * @param rule the flow rule
574 * @return the typed flow entry.
577 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
580 return getFlowEntryInternal(rule);
584 * Gets the all typed flow entries in flow table.
586 * @return the set of typed flow entry.
589 public Set<TypedStoredFlowEntry> getFlowEntries() {
590 return getFlowEntriesInternal();
594 * Gets the short typed flow entries in flow table.
596 * @return the set of typed flow entry.
599 public Set<StoredFlowEntry> getShortFlows() {
600 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
604 * Gets the mid typed flow entries in flow table.
606 * @return the set of typed flow entry.
609 public Set<StoredFlowEntry> getMidFlows() {
610 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
614 * Gets the long typed flow entries in flow table.
616 * @return the set of typed flow entry.
619 public Set<StoredFlowEntry> getLongFlows() {
620 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
624 * Add typed flow entry into table only.
626 * @param rule the flow rule
629 public synchronized void add(TypedStoredFlowEntry rule) {
632 //rule have to be new DefaultTypedFlowEntry
633 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
641 * Calculates and set the flow live type at the first time,
642 * and then add it into a corresponding typed flow table.
644 * @param rule the flow rule
647 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
650 calAndSetFlowLiveTypeInternal(rule);
654 * Add the typed flow entry into table, and calculates and set the flow live type,
655 * and then add it into a corresponding typed flow table.
657 * @param rule the flow rule
660 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
663 //rule have to be new DefaultTypedFlowEntry
664 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
666 calAndSetFlowLiveTypeInternal(rule);
667 addWithSetFlowLiveTypeCount++;
669 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
670 + " ADD Failed, cause it may already exists in table !!!,"
671 + " AdaptiveStats collection thread for {}",
676 // In real, calculates and set the flow live type at the first time,
677 // and then add it into a corresponding typed flow table
678 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
679 long life = rule.life();
680 FlowLiveType prevFlowLiveType = rule.flowLiveType();
682 if (life >= longPollInterval) {
683 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
685 } else if (life >= midPollInterval) {
686 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
688 } else if (life >= calAndPollInterval) {
689 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
690 shortFlows.add(rule);
691 } else if (life >= 0) {
692 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
694 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
697 if (rule.flowLiveType() != prevFlowLiveType) {
698 switch (prevFlowLiveType) {
699 // delete it from previous flow table
701 shortFlows.remove(rule);
704 midFlows.remove(rule);
707 longFlows.remove(rule);
716 // check the flow live type based on current time, then set and add it into corresponding table
717 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
718 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
719 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
720 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
721 // fe.life() unit is SECOND!
722 long liveTime = fe.life() + fromLastSeen;
725 switch (fe.flowLiveType()) {
727 if (liveTime >= longPollInterval) {
728 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
730 } else if (liveTime >= midPollInterval) {
731 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
733 } else if (liveTime >= calAndPollInterval) {
734 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
739 if (liveTime >= longPollInterval) {
740 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
741 shortFlows.remove(fe);
743 } else if (liveTime >= midPollInterval) {
744 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
745 shortFlows.remove(fe);
750 if (liveTime >= longPollInterval) {
751 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
757 if (fromLastSeen > entirePollInterval) {
758 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
762 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
764 // Error Unknown Live Type
765 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
766 + "AdaptiveStats collection thread for {}",
771 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
772 + ", state=" + fe.state()
773 + ", After liveType=" + fe.flowLiveType()
774 + ", liveTime=" + liveTime
775 + ", life=" + fe.life()
776 + ", bytes=" + fe.bytes()
777 + ", packets=" + fe.packets()
778 + ", fromLastSeen=" + fromLastSeen
779 + ", priority=" + fe.priority()
780 + ", selector=" + fe.selector().criteria()
781 + ", treatment=" + fe.treatment()
782 + " AdaptiveStats collection thread for {}",
789 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
792 public void checkAndMoveLiveFlowAll() {
793 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
795 long calCurTime = System.currentTimeMillis();
796 typedFlowEntries.forEach(fe -> {
797 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
802 // print table counts for debug
803 if (log.isDebugEnabled()) {
804 synchronized (this) {
805 long totalFlowCount = getFlowCount();
806 long shortFlowCount = shortFlows.size();
807 long midFlowCount = midFlows.size();
808 long longFlowCount = longFlows.size();
809 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
810 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
812 log.debug("--------------------------------------------------------------------------- for {}",
814 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
815 + ", add - remove_Count=" + calTotalCount
816 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
817 + ", SHORT_FLOW_Count=" + shortFlowCount
818 + ", MID_FLOW_Count=" + midFlowCount
819 + ", LONG_FLOW_Count=" + longFlowCount
820 + ", add_Count=" + addCount
821 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
822 + ", remove_Count=" + removeCount
823 + " AdaptiveStats collection thread for {}", sw.getStringId());
824 log.debug("--------------------------------------------------------------------------- for {}",
826 if (totalFlowCount != calTotalCount) {
827 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
828 + "calculated total flow count do NOT match, something is wrong internally "
829 + "or check counter value bound is over!");
831 if (immediateFlowCount < 0) {
832 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
833 + "something is wrong internally "
834 + "or check counter value bound is over!");
838 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
842 * Remove the typed flow entry from table.
844 * @param rule the flow rule
847 public synchronized void remove(FlowRule rule) {
850 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
851 if (removeStore != null) {
852 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
853 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
861 // Remove the typed flow entry from corresponding table
862 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
863 switch (fe.flowLiveType()) {
868 shortFlows.remove(fe);
874 longFlows.remove(fe);
876 default: // error in Flow Live Type
877 log.error("removeLiveFlowsInternal, Unknown Live Type error!");