2 * Copyright 2015 Open Networking Laboratory
\r
4 * Licensed under the Apache License, Version 2.0 (the "License");
\r
5 * you may not use this file except in compliance with the License.
\r
6 * You may obtain a copy of the License at
\r
8 * http://www.apache.org/licenses/LICENSE-2.0
\r
10 * Unless required by applicable law or agreed to in writing, software
\r
11 * distributed under the License is distributed on an "AS IS" BASIS,
\r
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
13 * See the License for the specific language governing permissions and
\r
14 * limitations under the License.
\r
17 package org.onosproject.provider.of.flow.impl;
\r
19 import java.util.HashSet;
\r
20 import java.util.List;
\r
21 import java.util.Map;
\r
22 import java.util.Optional;
\r
23 import java.util.Set;
\r
24 import java.util.concurrent.TimeUnit;
\r
25 import java.util.concurrent.ScheduledFuture;
\r
26 import java.util.concurrent.Executors;
\r
27 import java.util.concurrent.ScheduledExecutorService;
\r
29 import com.google.common.base.Objects;
\r
30 import com.google.common.collect.ImmutableSet;
\r
31 import com.google.common.collect.Maps;
\r
32 import com.google.common.collect.Sets;
\r
34 import org.onosproject.net.flow.DefaultTypedFlowEntry;
\r
35 import org.onosproject.net.flow.FlowEntry;
\r
36 import org.onosproject.net.flow.FlowId;
\r
37 import org.onosproject.net.flow.FlowRule;
\r
38 import org.onosproject.net.flow.StoredFlowEntry;
\r
39 import org.onosproject.net.flow.TypedStoredFlowEntry;
\r
40 import org.onosproject.net.flow.instructions.Instruction;
\r
41 import org.onosproject.net.flow.instructions.Instructions;
\r
42 import org.onosproject.openflow.controller.OpenFlowSwitch;
\r
43 import org.onosproject.openflow.controller.RoleState;
\r
44 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
\r
45 import org.projectfloodlight.openflow.protocol.match.Match;
\r
46 import org.projectfloodlight.openflow.types.OFPort;
\r
47 import org.projectfloodlight.openflow.types.TableId;
\r
48 import org.slf4j.Logger;
\r
50 import static com.google.common.base.Preconditions.checkNotNull;
\r
51 import static org.onlab.util.Tools.groupedThreads;
\r
52 import static org.onosproject.net.flow.TypedStoredFlowEntry.*;
\r
53 import static org.slf4j.LoggerFactory.getLogger;
\r
56 * Efficiently and adaptively collects flow statistics for the specified switch.
\r
58 public class NewAdaptiveFlowStatsCollector {
\r
60 private final Logger log = getLogger(getClass());
\r
62 private final OpenFlowSwitch sw;
\r
64 private ScheduledExecutorService adaptiveFlowStatsScheduler =
\r
65 Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));
\r
66 private ScheduledFuture<?> calAndShortFlowsThread;
\r
67 private ScheduledFuture<?> midFlowsThread;
\r
68 private ScheduledFuture<?> longFlowsThread;
\r
70 // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
\r
71 private CalAndShortFlowsTask calAndShortFlowsTask;
\r
72 // Task that collects stats MID flows every 2*calAndPollInterval
\r
73 private MidFlowsTask midFlowsTask;
\r
74 // Task that collects stats LONG flows every 3*calAndPollInterval
\r
75 private LongFlowsTask longFlowsTask;
\r
77 private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
\r
78 private static final int MID_POLL_TIMES = 2; // variable greater or equal than 1
\r
79 private static final int LONG_POLL_TIMES = 3; // variable greater or equal than MID_POLL_TIMES
\r
80 //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
\r
81 // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
\r
82 private static final int ENTIRE_POLL_TIMES = 6;
\r
84 private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
\r
85 private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
\r
86 private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;
\r
88 private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
\r
89 private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
\r
90 private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
\r
91 // only used for checking condition at each task if it collects entire flows from a given switch or not
\r
92 private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
\r
94 // Number of call count of each Task,
\r
95 // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
\r
96 private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
\r
97 private int callCountMidFlowsTask = 0; // increased MID_POLL_TIMES whenever Task is called
\r
98 private int callCountLongFlowsTask = 0; // increased LONG_POLL_TIMES whenever Task is called
\r
100 private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();
\r
102 private boolean isFirstTimeStart = true;
\r
104 public static final long NO_FLOW_MISSING_XID = (-1);
\r
105 private long flowMissingXid = NO_FLOW_MISSING_XID;
\r
108 * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
\r
110 * @param sw switch to pull
\r
111 * @param pollInterval cal and immediate poll frequency in seconds
\r
113 NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {
\r
116 initMemberVars(pollInterval);
\r
119 // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
\r
120 private void initMemberVars(int pollInterval) {
\r
121 if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
\r
122 this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
\r
123 } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
\r
124 this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
\r
126 this.calAndPollInterval = pollInterval;
\r
129 calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
\r
130 midPollInterval = MID_POLL_TIMES * calAndPollInterval;
\r
131 longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
\r
132 entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;
\r
134 callCountCalAndShortFlowsTask = 0;
\r
135 callCountMidFlowsTask = 0;
\r
136 callCountLongFlowsTask = 0;
\r
138 flowMissingXid = NO_FLOW_MISSING_XID;
\r
142 * Adjusts adaptive poll frequency.
\r
144 * @param pollInterval poll frequency in seconds
\r
146 synchronized void adjustCalAndPollInterval(int pollInterval) {
\r
147 initMemberVars(pollInterval);
\r
149 if (calAndShortFlowsThread != null) {
\r
150 calAndShortFlowsThread.cancel(false);
\r
152 if (midFlowsThread != null) {
\r
153 midFlowsThread.cancel(false);
\r
155 if (longFlowsThread != null) {
\r
156 longFlowsThread.cancel(false);
\r
159 calAndShortFlowsTask = new CalAndShortFlowsTask();
\r
160 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
161 calAndShortFlowsTask,
\r
163 calAndPollInterval,
\r
166 midFlowsTask = new MidFlowsTask();
\r
167 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
173 longFlowsTask = new LongFlowsTask();
\r
174 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
180 log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");
\r
183 private class CalAndShortFlowsTask implements Runnable {
\r
185 public void run() {
\r
186 if (sw.getRole() == RoleState.MASTER) {
\r
187 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
\r
189 if (isFirstTimeStart) {
\r
190 // isFirstTimeStart, get entire flow stats from a given switch sw
\r
191 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
\r
193 ofFlowStatsRequestAllSend();
\r
195 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
\r
196 isFirstTimeStart = false;
\r
197 } else if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
\r
198 // entire_poll_times, get entire flow stats from a given switch sw
\r
199 log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
\r
200 ofFlowStatsRequestAllSend();
\r
202 callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
\r
203 //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
\r
206 calAndShortFlowsTaskInternal();
\r
207 callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
\r
213 // send openflow flow stats request message with getting all flow entries to a given switch sw
\r
214 private void ofFlowStatsRequestAllSend() {
\r
215 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
\r
216 .setMatch(sw.factory().matchWildcardAll())
\r
217 .setTableId(TableId.ALL)
\r
218 .setOutPort(OFPort.NO_MASK)
\r
221 synchronized (this) {
\r
222 // set the request xid to check the reply in OpenFlowRuleProvider
\r
223 // After processing the reply of this request message,
\r
224 // this must be set to NO_FLOW_MISSING_XID(-1) by provider
\r
225 setFlowMissingXid(request.getXid());
\r
226 log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());
\r
228 sw.sendMsg(request);
\r
232 // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
\r
233 private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
\r
235 Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch();
\r
236 // set find tableId
\r
237 TableId tableId = TableId.of(fe.tableId());
\r
239 Instruction ins = fe.treatment().allInstructions().stream()
\r
240 .filter(i -> (i.type() == Instruction.Type.OUTPUT))
\r
243 OFPort ofPort = OFPort.NO_MASK;
\r
245 Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
\r
246 ofPort = OFPort.of((int) ((out.port().toLong())));
\r
249 OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
\r
251 .setTableId(tableId)
\r
252 .setOutPort(ofPort)
\r
255 synchronized (this) {
\r
256 if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
\r
257 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
\r
258 + " set no flow missing xid anyway, for {}",
\r
260 setFlowMissingXid(NO_FLOW_MISSING_XID);
\r
263 sw.sendMsg(request);
\r
267 private void calAndShortFlowsTaskInternal() {
\r
268 deviceFlowTable.checkAndMoveLiveFlowAll();
\r
270 deviceFlowTable.getShortFlows().forEach(fe -> {
\r
271 ofFlowStatsRequestFlowSend(fe);
\r
275 private class MidFlowsTask implements Runnable {
\r
277 public void run() {
\r
278 if (sw.getRole() == RoleState.MASTER) {
\r
279 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
\r
281 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
\r
282 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
\r
283 callCountMidFlowsTask = MID_POLL_TIMES;
\r
285 midFlowsTaskInternal();
\r
286 callCountMidFlowsTask += MID_POLL_TIMES;
\r
292 private void midFlowsTaskInternal() {
\r
293 deviceFlowTable.getMidFlows().forEach(fe -> {
\r
294 ofFlowStatsRequestFlowSend(fe);
\r
298 private class LongFlowsTask implements Runnable {
\r
300 public void run() {
\r
301 if (sw.getRole() == RoleState.MASTER) {
\r
302 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());
\r
304 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
\r
305 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
\r
306 callCountLongFlowsTask = LONG_POLL_TIMES;
\r
308 longFlowsTaskInternal();
\r
309 callCountLongFlowsTask += LONG_POLL_TIMES;
\r
315 private void longFlowsTaskInternal() {
\r
316 deviceFlowTable.getLongFlows().forEach(fe -> {
\r
317 ofFlowStatsRequestFlowSend(fe);
\r
322 * start adaptive flow statistic collection.
\r
325 public synchronized void start() {
\r
326 log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
\r
327 callCountCalAndShortFlowsTask = 0;
\r
328 callCountMidFlowsTask = 0;
\r
329 callCountLongFlowsTask = 0;
\r
331 isFirstTimeStart = true;
\r
333 // Initially start polling quickly. Then drop down to configured value
\r
334 calAndShortFlowsTask = new CalAndShortFlowsTask();
\r
335 calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
336 calAndShortFlowsTask,
\r
338 calAndPollInterval,
\r
341 midFlowsTask = new MidFlowsTask();
\r
342 midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
348 longFlowsTask = new LongFlowsTask();
\r
349 longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
\r
355 log.info("Started");
\r
359 * stop adaptive flow statistic collection.
\r
362 public synchronized void stop() {
\r
363 log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
\r
364 if (calAndShortFlowsThread != null) {
\r
365 calAndShortFlowsThread.cancel(true);
\r
367 if (midFlowsThread != null) {
\r
368 midFlowsThread.cancel(true);
\r
370 if (longFlowsThread != null) {
\r
371 longFlowsThread.cancel(true);
\r
374 adaptiveFlowStatsScheduler.shutdownNow();
\r
376 isFirstTimeStart = false;
\r
378 log.info("Stopped");
\r
382 * add typed flow entry from flow rule into the internal flow table.
\r
384 * @param flowRules the flow rules
\r
387 public synchronized void addWithFlowRule(FlowRule... flowRules) {
\r
388 for (FlowRule fr : flowRules) {
\r
389 // First remove old entry unconditionally, if exist
\r
390 deviceFlowTable.remove(fr);
\r
392 // add new flow entry, we suppose IMMEDIATE_FLOW
\r
393 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
\r
394 FlowLiveType.IMMEDIATE_FLOW);
\r
395 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
\r
400 * add or update typed flow entry from flow entry into the internal flow table.
\r
402 * @param flowEntries the flow entries
\r
405 public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
\r
406 for (FlowEntry fe : flowEntries) {
\r
407 // check if this new rule is an update to an existing entry
\r
408 TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);
\r
410 if (stored != null) {
\r
411 // duplicated flow entry is collected!, just skip
\r
412 if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
\r
413 && fe.life() == stored.life()) {
\r
414 log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())
\r
415 + ",is DUPLICATED stats collection, just skip."
\r
416 + " AdaptiveStats collection thread for {}",
\r
419 stored.setLastSeen();
\r
421 } else if (fe.life() < stored.life()) {
\r
422 // Invalid updates the stats values, i.e., bytes, packets, durations ...
\r
423 log.debug("addOrUpdateFlows():" +
\r
424 " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +
\r
425 " new flowId=" + Long.toHexString(fe.id().value()) +
\r
426 ", old flowId=" + Long.toHexString(stored.id().value()) +
\r
427 ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +
\r
428 ", new life=" + fe.life() + ", old life=" + stored.life() +
\r
429 ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());
\r
431 stored.setLastSeen();
\r
436 stored.setLife(fe.life());
\r
437 stored.setPackets(fe.packets());
\r
438 stored.setBytes(fe.bytes());
\r
439 stored.setLastSeen();
\r
440 if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
\r
441 // flow is really RULE_ADDED
\r
442 stored.setState(FlowEntry.FlowEntryState.ADDED);
\r
444 // flow is RULE_UPDATED, skip adding and just updating flow live table
\r
445 //deviceFlowTable.calAndSetFlowLiveType(stored);
\r
449 // add new flow entry, we suppose IMMEDIATE_FLOW
\r
450 TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
\r
451 FlowLiveType.IMMEDIATE_FLOW);
\r
452 deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
\r
457 * remove typed flow entry from the internal flow table.
\r
459 * @param flowRules the flow entries
\r
462 public synchronized void removeFlows(FlowRule... flowRules) {
\r
463 for (FlowRule rule : flowRules) {
\r
464 deviceFlowTable.remove(rule);
\r
468 // same as removeFlows() function
\r
470 * remove typed flow entry from the internal flow table.
\r
472 * @param flowRules the flow entries
\r
475 public void flowRemoved(FlowRule... flowRules) {
\r
476 removeFlows(flowRules);
\r
479 // same as addOrUpdateFlows() function
\r
481 * add or update typed flow entry from flow entry into the internal flow table.
\r
483 * @param flowEntries the flow entry list
\r
486 public void pushFlowMetrics(List<FlowEntry> flowEntries) {
\r
487 flowEntries.forEach(fe -> {
\r
488 addOrUpdateFlows(fe);
\r
493 * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
\r
495 * @return xid of missing flow
\r
497 public long getFlowMissingXid() {
\r
498 return flowMissingXid;
\r
502 * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
\r
504 * @param flowMissingXid the OFFlowStatsRequest message Id
\r
507 public void setFlowMissingXid(long flowMissingXid) {
\r
508 this.flowMissingXid = flowMissingXid;
\r
511 private class InternalDeviceFlowTable {
\r
513 private final Map<FlowId, Set<TypedStoredFlowEntry>>
\r
514 flowEntries = Maps.newConcurrentMap();
\r
516 private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
\r
517 private final Set<StoredFlowEntry> midFlows = new HashSet<>();
\r
518 private final Set<StoredFlowEntry> longFlows = new HashSet<>();
\r
520 // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
\r
521 private final long latencyFlowStatsRequestAndReplyMillis = 500;
\r
524 // Statistics for table operation
\r
525 private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
\r
526 private long removeCount = 0;
\r
529 * Resets all count values with zero.
\r
532 public void resetAllCount() {
\r
534 addWithSetFlowLiveTypeCount = 0;
\r
538 // get set of flow entries for the given flowId
\r
539 private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
\r
540 return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
\r
543 // get flow entry for the given flow rule
\r
544 private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
\r
545 Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
\r
546 return flowEntries.stream()
\r
547 .filter(entry -> Objects.equal(entry, rule))
\r
552 // get the flow entries for all flows in flow table
\r
553 private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
\r
554 Set<TypedStoredFlowEntry> result = Sets.newHashSet();
\r
556 flowEntries.values().forEach(result::addAll);
\r
561 * Gets the number of flow entry in flow table.
\r
563 * @return the number of flow entry.
\r
566 public long getFlowCount() {
\r
567 return flowEntries.values().stream().mapToLong(Set::size).sum();
\r
571 * Gets the number of flow entry in flow table.
\r
573 * @param rule the flow rule
\r
574 * @return the typed flow entry.
\r
577 public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
\r
578 checkNotNull(rule);
\r
580 return getFlowEntryInternal(rule);
\r
584 * Gets the all typed flow entries in flow table.
\r
586 * @return the set of typed flow entry.
\r
589 public Set<TypedStoredFlowEntry> getFlowEntries() {
\r
590 return getFlowEntriesInternal();
\r
594 * Gets the short typed flow entries in flow table.
\r
596 * @return the set of typed flow entry.
\r
599 public Set<StoredFlowEntry> getShortFlows() {
\r
600 return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);
\r
604 * Gets the mid typed flow entries in flow table.
\r
606 * @return the set of typed flow entry.
\r
609 public Set<StoredFlowEntry> getMidFlows() {
\r
610 return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);
\r
614 * Gets the long typed flow entries in flow table.
\r
616 * @return the set of typed flow entry.
\r
619 public Set<StoredFlowEntry> getLongFlows() {
\r
620 return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);
\r
624 * Add typed flow entry into table only.
\r
626 * @param rule the flow rule
\r
629 public synchronized void add(TypedStoredFlowEntry rule) {
\r
630 checkNotNull(rule);
\r
632 //rule have to be new DefaultTypedFlowEntry
\r
633 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
\r
641 * Calculates and set the flow live type at the first time,
\r
642 * and then add it into a corresponding typed flow table.
\r
644 * @param rule the flow rule
\r
647 public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
\r
648 checkNotNull(rule);
\r
650 calAndSetFlowLiveTypeInternal(rule);
\r
654 * Add the typed flow entry into table, and calculates and set the flow live type,
\r
655 * and then add it into a corresponding typed flow table.
\r
657 * @param rule the flow rule
\r
660 public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
\r
661 checkNotNull(rule);
\r
663 //rule have to be new DefaultTypedFlowEntry
\r
664 boolean result = getFlowEntriesInternal(rule.id()).add(rule);
\r
666 calAndSetFlowLiveTypeInternal(rule);
\r
667 addWithSetFlowLiveTypeCount++;
\r
669 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())
\r
670 + " ADD Failed, cause it may already exists in table !!!,"
\r
671 + " AdaptiveStats collection thread for {}",
\r
676 // In real, calculates and set the flow live type at the first time,
\r
677 // and then add it into a corresponding typed flow table
\r
678 private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
\r
679 long life = rule.life();
\r
680 FlowLiveType prevFlowLiveType = rule.flowLiveType();
\r
682 if (life >= longPollInterval) {
\r
683 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
\r
684 longFlows.add(rule);
\r
685 } else if (life >= midPollInterval) {
\r
686 rule.setFlowLiveType(FlowLiveType.MID_FLOW);
\r
687 midFlows.add(rule);
\r
688 } else if (life >= calAndPollInterval) {
\r
689 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
\r
690 shortFlows.add(rule);
\r
691 } else if (life >= 0) {
\r
692 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
\r
693 } else { // life < 0
\r
694 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
\r
697 if (rule.flowLiveType() != prevFlowLiveType) {
\r
698 switch (prevFlowLiveType) {
\r
699 // delete it from previous flow table
\r
701 shortFlows.remove(rule);
\r
704 midFlows.remove(rule);
\r
707 longFlows.remove(rule);
\r
716 // check the flow live type based on current time, then set and add it into corresponding table
\r
717 private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
\r
718 long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
\r
719 // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
\r
720 long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
\r
721 // fe.life() unit is SECOND!
\r
722 long liveTime = fe.life() + fromLastSeen;
\r
725 switch (fe.flowLiveType()) {
\r
726 case IMMEDIATE_FLOW:
\r
727 if (liveTime >= longPollInterval) {
\r
728 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
\r
730 } else if (liveTime >= midPollInterval) {
\r
731 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
\r
733 } else if (liveTime >= calAndPollInterval) {
\r
734 fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
\r
735 shortFlows.add(fe);
\r
739 if (liveTime >= longPollInterval) {
\r
740 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
\r
741 shortFlows.remove(fe);
\r
743 } else if (liveTime >= midPollInterval) {
\r
744 fe.setFlowLiveType(FlowLiveType.MID_FLOW);
\r
745 shortFlows.remove(fe);
\r
750 if (liveTime >= longPollInterval) {
\r
751 fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
\r
752 midFlows.remove(fe);
\r
757 if (fromLastSeen > entirePollInterval) {
\r
758 log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");
\r
762 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
\r
764 // Error Unknown Live Type
\r
765 log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"
\r
766 + "AdaptiveStats collection thread for {}",
\r
771 log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())
\r
772 + ", state=" + fe.state()
\r
773 + ", After liveType=" + fe.flowLiveType()
\r
774 + ", liveTime=" + liveTime
\r
775 + ", life=" + fe.life()
\r
776 + ", bytes=" + fe.bytes()
\r
777 + ", packets=" + fe.packets()
\r
778 + ", fromLastSeen=" + fromLastSeen
\r
779 + ", priority=" + fe.priority()
\r
780 + ", selector=" + fe.selector().criteria()
\r
781 + ", treatment=" + fe.treatment()
\r
782 + " AdaptiveStats collection thread for {}",
\r
789 * Check and move live type for all type flow entries in table at every calAndPollInterval time.
\r
792 public void checkAndMoveLiveFlowAll() {
\r
793 Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();
\r
795 long calCurTime = System.currentTimeMillis();
\r
796 typedFlowEntries.forEach(fe -> {
\r
797 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
\r
802 // print table counts for debug
\r
803 if (log.isDebugEnabled()) {
\r
804 synchronized (this) {
\r
805 long totalFlowCount = getFlowCount();
\r
806 long shortFlowCount = shortFlows.size();
\r
807 long midFlowCount = midFlows.size();
\r
808 long longFlowCount = longFlows.size();
\r
809 long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
\r
810 long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;
\r
812 log.debug("--------------------------------------------------------------------------- for {}",
\r
814 log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount
\r
815 + ", add - remove_Count=" + calTotalCount
\r
816 + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount
\r
817 + ", SHORT_FLOW_Count=" + shortFlowCount
\r
818 + ", MID_FLOW_Count=" + midFlowCount
\r
819 + ", LONG_FLOW_Count=" + longFlowCount
\r
820 + ", add_Count=" + addCount
\r
821 + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount
\r
822 + ", remove_Count=" + removeCount
\r
823 + " AdaptiveStats collection thread for {}", sw.getStringId());
\r
824 log.debug("--------------------------------------------------------------------------- for {}",
\r
826 if (totalFlowCount != calTotalCount) {
\r
827 log.error("checkAndMoveLiveFlowAll, Real total flow count and "
\r
828 + "calculated total flow count do NOT match, something is wrong internally "
\r
829 + "or check counter value bound is over!");
\r
831 if (immediateFlowCount < 0) {
\r
832 log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "
\r
833 + "something is wrong internally "
\r
834 + "or check counter value bound is over!");
\r
838 log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());
\r
842 * Remove the typed flow entry from table.
\r
844 * @param rule the flow rule
\r
847 public synchronized void remove(FlowRule rule) {
\r
848 checkNotNull(rule);
\r
850 TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
\r
851 if (removeStore != null) {
\r
852 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);
\r
853 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);
\r
861 // Remove the typed flow entry from corresponding table
\r
862 private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
\r
863 switch (fe.flowLiveType()) {
\r
864 case IMMEDIATE_FLOW:
\r
868 shortFlows.remove(fe);
\r
871 midFlows.remove(fe);
\r
874 longFlows.remove(fe);
\r
876 default: // error in Flow Live Type
\r
877 log.error("removeLiveFlowsInternal, Unknown Live Type error!");
\r