487cae9652d43f24d91c4249defd0624f412eb2f
[onosfw.git] /
1 /*\r
2  * Copyright 2015 Open Networking Laboratory\r
3  *\r
4  * Licensed under the Apache License, Version 2.0 (the "License");\r
5  * you may not use this file except in compliance with the License.\r
6  * You may obtain a copy of the License at\r
7  *\r
8  *     http://www.apache.org/licenses/LICENSE-2.0\r
9  *\r
10  * Unless required by applicable law or agreed to in writing, software\r
11  * distributed under the License is distributed on an "AS IS" BASIS,\r
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
13  * See the License for the specific language governing permissions and\r
14  * limitations under the License.\r
15  */\r
16 \r
17 package org.onosproject.provider.of.flow.impl;\r
18 \r
19 import java.util.HashSet;\r
20 import java.util.List;\r
21 import java.util.Map;\r
22 import java.util.Optional;\r
23 import java.util.Set;\r
24 import java.util.concurrent.TimeUnit;\r
25 import java.util.concurrent.ScheduledFuture;\r
26 import java.util.concurrent.Executors;\r
27 import java.util.concurrent.ScheduledExecutorService;\r
28 \r
29 import com.google.common.base.Objects;\r
30 import com.google.common.collect.ImmutableSet;\r
31 import com.google.common.collect.Maps;\r
32 import com.google.common.collect.Sets;\r
33 \r
34 import org.onosproject.net.flow.DefaultTypedFlowEntry;\r
35 import org.onosproject.net.flow.FlowEntry;\r
36 import org.onosproject.net.flow.FlowId;\r
37 import org.onosproject.net.flow.FlowRule;\r
38 import org.onosproject.net.flow.StoredFlowEntry;\r
39 import org.onosproject.net.flow.TypedStoredFlowEntry;\r
40 import org.onosproject.net.flow.instructions.Instruction;\r
41 import org.onosproject.net.flow.instructions.Instructions;\r
42 import org.onosproject.openflow.controller.OpenFlowSwitch;\r
43 import org.onosproject.openflow.controller.RoleState;\r
44 import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;\r
45 import org.projectfloodlight.openflow.protocol.match.Match;\r
46 import org.projectfloodlight.openflow.types.OFPort;\r
47 import org.projectfloodlight.openflow.types.TableId;\r
48 import org.slf4j.Logger;\r
49 \r
50 import static com.google.common.base.Preconditions.checkNotNull;\r
51 import static org.onlab.util.Tools.groupedThreads;\r
52 import static org.onosproject.net.flow.TypedStoredFlowEntry.*;\r
53 import static org.slf4j.LoggerFactory.getLogger;\r
54 \r
55 /**\r
56  * Efficiently and adaptively collects flow statistics for the specified switch.\r
57  */\r
58 public class NewAdaptiveFlowStatsCollector {\r
59 \r
60     private final Logger log = getLogger(getClass());\r
61 \r
62     private final OpenFlowSwitch sw;\r
63 \r
64     private ScheduledExecutorService adaptiveFlowStatsScheduler =\r
65             Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));\r
66     private ScheduledFuture<?> calAndShortFlowsThread;\r
67     private ScheduledFuture<?> midFlowsThread;\r
68     private ScheduledFuture<?> longFlowsThread;\r
69 \r
70     // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval\r
71     private CalAndShortFlowsTask calAndShortFlowsTask;\r
72     // Task that collects stats MID flows every 2*calAndPollInterval\r
73     private MidFlowsTask midFlowsTask;\r
74     // Task that collects stats LONG flows every 3*calAndPollInterval\r
75     private LongFlowsTask longFlowsTask;\r
76 \r
77     private static final int CAL_AND_POLL_TIMES = 1; // must be always 0\r
78     private static final int MID_POLL_TIMES = 2;     // variable greater or equal than 1\r
79     private static final int LONG_POLL_TIMES = 3;    // variable greater or equal than MID_POLL_TIMES\r
80     //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable\r
81     // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES\r
82     private static final int ENTIRE_POLL_TIMES = 6;\r
83 \r
84     private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;\r
85     private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;\r
86     private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;\r
87 \r
88     private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;\r
89     private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;\r
90     private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;\r
91     // only used for checking condition at each task if it collects entire flows from a given switch or not\r
92     private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;\r
93 \r
94     // Number of call count of each Task,\r
95     // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask\r
96     private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called\r
97     private int callCountMidFlowsTask = 0;   // increased MID_POLL_TIMES whenever Task is called\r
98     private int callCountLongFlowsTask = 0;  // increased LONG_POLL_TIMES whenever Task is called\r
99 \r
100     private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();\r
101 \r
102     private boolean isFirstTimeStart = true;\r
103 \r
104     public static final long NO_FLOW_MISSING_XID = (-1);\r
105     private long flowMissingXid = NO_FLOW_MISSING_XID;\r
106 \r
107     /**\r
108      * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.\r
109      *\r
110      * @param sw           switch to pull\r
111      * @param pollInterval cal and immediate poll frequency in seconds\r
112      */\r
113     NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {\r
114         this.sw = sw;\r
115 \r
116         initMemberVars(pollInterval);\r
117     }\r
118 \r
119     // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count\r
120     private void initMemberVars(int pollInterval) {\r
121         if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {\r
122             this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;\r
123         } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {\r
124             this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;\r
125         } else {\r
126             this.calAndPollInterval = pollInterval;\r
127         }\r
128 \r
129         calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;\r
130         midPollInterval = MID_POLL_TIMES * calAndPollInterval;\r
131         longPollInterval = LONG_POLL_TIMES * calAndPollInterval;\r
132         entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;\r
133 \r
134         callCountCalAndShortFlowsTask = 0;\r
135         callCountMidFlowsTask = 0;\r
136         callCountLongFlowsTask = 0;\r
137 \r
138         flowMissingXid = NO_FLOW_MISSING_XID;\r
139     }\r
140 \r
141     /**\r
142      * Adjusts adaptive poll frequency.\r
143      *\r
144      * @param pollInterval poll frequency in seconds\r
145      */\r
146     synchronized void adjustCalAndPollInterval(int pollInterval) {\r
147         initMemberVars(pollInterval);\r
148 \r
149         if (calAndShortFlowsThread != null) {\r
150             calAndShortFlowsThread.cancel(false);\r
151         }\r
152         if (midFlowsThread != null) {\r
153             midFlowsThread.cancel(false);\r
154         }\r
155         if (longFlowsThread != null) {\r
156             longFlowsThread.cancel(false);\r
157         }\r
158 \r
159         calAndShortFlowsTask = new CalAndShortFlowsTask();\r
160         calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
161                 calAndShortFlowsTask,\r
162                 0,\r
163                 calAndPollInterval,\r
164                 TimeUnit.SECONDS);\r
165 \r
166         midFlowsTask = new MidFlowsTask();\r
167         midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
168                 midFlowsTask,\r
169                 0,\r
170                 midPollInterval,\r
171                 TimeUnit.SECONDS);\r
172 \r
173         longFlowsTask = new LongFlowsTask();\r
174         longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
175                 longFlowsTask,\r
176                 0,\r
177                 longPollInterval,\r
178                 TimeUnit.SECONDS);\r
179 \r
180         log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");\r
181     }\r
182 \r
183     private class CalAndShortFlowsTask implements Runnable {\r
184         @Override\r
185         public void run() {\r
186             if (sw.getRole() == RoleState.MASTER) {\r
187                 log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());\r
188 \r
189                 if (isFirstTimeStart) {\r
190                     // isFirstTimeStart, get entire flow stats from a given switch sw\r
191                     log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",\r
192                             sw.getStringId());\r
193                     ofFlowStatsRequestAllSend();\r
194 \r
195                     callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;\r
196                     isFirstTimeStart = false;\r
197                 } else  if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {\r
198                     // entire_poll_times, get entire flow stats from a given switch sw\r
199                     log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());\r
200                     ofFlowStatsRequestAllSend();\r
201 \r
202                     callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;\r
203                     //TODO: check flows deleted in switch, but exist in controller flow table, then remove them\r
204                     //\r
205                 } else {\r
206                     calAndShortFlowsTaskInternal();\r
207                     callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;\r
208                 }\r
209             }\r
210         }\r
211     }\r
212 \r
213     // send openflow flow stats request message with getting all flow entries to a given switch sw\r
214     private void ofFlowStatsRequestAllSend() {\r
215         OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()\r
216                 .setMatch(sw.factory().matchWildcardAll())\r
217                 .setTableId(TableId.ALL)\r
218                 .setOutPort(OFPort.NO_MASK)\r
219                 .build();\r
220 \r
221         synchronized (this) {\r
222             // set the request xid to check the reply in OpenFlowRuleProvider\r
223             // After processing the reply of this request message,\r
224             // this must be set to NO_FLOW_MISSING_XID(-1) by provider\r
225             setFlowMissingXid(request.getXid());\r
226             log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());\r
227 \r
228             sw.sendMsg(request);\r
229         }\r
230     }\r
231 \r
232     // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw\r
233     private void ofFlowStatsRequestFlowSend(FlowEntry fe) {\r
234         // set find match\r
235         Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch();\r
236         // set find tableId\r
237         TableId tableId = TableId.of(fe.tableId());\r
238         // set output port\r
239         Instruction ins = fe.treatment().allInstructions().stream()\r
240                 .filter(i -> (i.type() == Instruction.Type.OUTPUT))\r
241                 .findFirst()\r
242                 .orElse(null);\r
243         OFPort ofPort = OFPort.NO_MASK;\r
244         if (ins != null) {\r
245             Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;\r
246             ofPort = OFPort.of((int) ((out.port().toLong())));\r
247         }\r
248 \r
249         OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()\r
250                 .setMatch(match)\r
251                 .setTableId(tableId)\r
252                 .setOutPort(ofPort)\r
253                 .build();\r
254 \r
255         synchronized (this) {\r
256             if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {\r
257                 log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"\r
258                                 + " set no flow missing xid anyway, for {}",\r
259                         sw.getStringId());\r
260                 setFlowMissingXid(NO_FLOW_MISSING_XID);\r
261             }\r
262 \r
263             sw.sendMsg(request);\r
264         }\r
265     }\r
266 \r
267     private void calAndShortFlowsTaskInternal() {\r
268         deviceFlowTable.checkAndMoveLiveFlowAll();\r
269 \r
270         deviceFlowTable.getShortFlows().forEach(fe -> {\r
271             ofFlowStatsRequestFlowSend(fe);\r
272         });\r
273     }\r
274 \r
275     private class MidFlowsTask implements Runnable {\r
276         @Override\r
277         public void run() {\r
278             if (sw.getRole() == RoleState.MASTER) {\r
279                 log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());\r
280 \r
281                 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw\r
282                 if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {\r
283                     callCountMidFlowsTask = MID_POLL_TIMES;\r
284                 } else {\r
285                     midFlowsTaskInternal();\r
286                     callCountMidFlowsTask += MID_POLL_TIMES;\r
287                 }\r
288             }\r
289         }\r
290     }\r
291 \r
292     private void midFlowsTaskInternal() {\r
293         deviceFlowTable.getMidFlows().forEach(fe -> {\r
294             ofFlowStatsRequestFlowSend(fe);\r
295         });\r
296     }\r
297 \r
298     private class LongFlowsTask implements Runnable {\r
299         @Override\r
300         public void run() {\r
301             if (sw.getRole() == RoleState.MASTER) {\r
302                 log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());\r
303 \r
304                 // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw\r
305                 if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {\r
306                     callCountLongFlowsTask = LONG_POLL_TIMES;\r
307                 } else {\r
308                     longFlowsTaskInternal();\r
309                     callCountLongFlowsTask += LONG_POLL_TIMES;\r
310                 }\r
311             }\r
312         }\r
313     }\r
314 \r
315     private void longFlowsTaskInternal() {\r
316         deviceFlowTable.getLongFlows().forEach(fe -> {\r
317             ofFlowStatsRequestFlowSend(fe);\r
318         });\r
319     }\r
320 \r
321     /**\r
322      * start adaptive flow statistic collection.\r
323      *\r
324      */\r
325     public synchronized void start() {\r
326         log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());\r
327         callCountCalAndShortFlowsTask = 0;\r
328         callCountMidFlowsTask = 0;\r
329         callCountLongFlowsTask = 0;\r
330 \r
331         isFirstTimeStart = true;\r
332 \r
333         // Initially start polling quickly. Then drop down to configured value\r
334         calAndShortFlowsTask = new CalAndShortFlowsTask();\r
335         calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
336                 calAndShortFlowsTask,\r
337                 1,\r
338                 calAndPollInterval,\r
339                 TimeUnit.SECONDS);\r
340 \r
341         midFlowsTask = new MidFlowsTask();\r
342         midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
343                 midFlowsTask,\r
344                 1,\r
345                 midPollInterval,\r
346                 TimeUnit.SECONDS);\r
347 \r
348         longFlowsTask = new LongFlowsTask();\r
349         longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(\r
350                 longFlowsTask,\r
351                 1,\r
352                 longPollInterval,\r
353                 TimeUnit.SECONDS);\r
354 \r
355         log.info("Started");\r
356     }\r
357 \r
358     /**\r
359      * stop adaptive flow statistic collection.\r
360      *\r
361      */\r
362     public synchronized void stop() {\r
363         log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());\r
364         if (calAndShortFlowsThread != null) {\r
365             calAndShortFlowsThread.cancel(true);\r
366         }\r
367         if (midFlowsThread != null) {\r
368             midFlowsThread.cancel(true);\r
369         }\r
370         if (longFlowsThread != null) {\r
371             longFlowsThread.cancel(true);\r
372         }\r
373 \r
374         adaptiveFlowStatsScheduler.shutdownNow();\r
375 \r
376         isFirstTimeStart = false;\r
377 \r
378         log.info("Stopped");\r
379     }\r
380 \r
381     /**\r
382      * add typed flow entry from flow rule into the internal flow table.\r
383      *\r
384      * @param flowRules the flow rules\r
385      *\r
386      */\r
387     public synchronized void addWithFlowRule(FlowRule... flowRules) {\r
388         for (FlowRule fr : flowRules) {\r
389             // First remove old entry unconditionally, if exist\r
390             deviceFlowTable.remove(fr);\r
391 \r
392             // add new flow entry, we suppose IMMEDIATE_FLOW\r
393             TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,\r
394                     FlowLiveType.IMMEDIATE_FLOW);\r
395             deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);\r
396         }\r
397     }\r
398 \r
399     /**\r
400      * add or update typed flow entry from flow entry into the internal flow table.\r
401      *\r
402      * @param flowEntries the flow entries\r
403      *\r
404      */\r
405     public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {\r
406        for (FlowEntry fe : flowEntries) {\r
407            // check if this new rule is an update to an existing entry\r
408            TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);\r
409 \r
410            if (stored != null) {\r
411                // duplicated flow entry is collected!, just skip\r
412                if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()\r
413                        && fe.life() == stored.life()) {\r
414                    log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())\r
415                                    + ",is DUPLICATED stats collection, just skip."\r
416                                    + " AdaptiveStats collection thread for {}",\r
417                            sw.getStringId());\r
418 \r
419                    stored.setLastSeen();\r
420                    continue;\r
421                } else if (fe.life() < stored.life()) {\r
422                    // Invalid updates the stats values, i.e., bytes, packets, durations ...\r
423                    log.debug("addOrUpdateFlows():" +\r
424                                " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +\r
425                                " new flowId=" + Long.toHexString(fe.id().value()) +\r
426                                ", old flowId=" + Long.toHexString(stored.id().value()) +\r
427                                ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +\r
428                                ", new life=" + fe.life() + ", old life=" + stored.life() +\r
429                                ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());\r
430                    // go next\r
431                    stored.setLastSeen();\r
432                    continue;\r
433                }\r
434 \r
435                // update now\r
436                stored.setLife(fe.life());\r
437                stored.setPackets(fe.packets());\r
438                stored.setBytes(fe.bytes());\r
439                stored.setLastSeen();\r
440                if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {\r
441                    // flow is really RULE_ADDED\r
442                    stored.setState(FlowEntry.FlowEntryState.ADDED);\r
443                }\r
444                // flow is RULE_UPDATED, skip adding and just updating flow live table\r
445                //deviceFlowTable.calAndSetFlowLiveType(stored);\r
446                continue;\r
447            }\r
448 \r
449            // add new flow entry, we suppose IMMEDIATE_FLOW\r
450            TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,\r
451                     FlowLiveType.IMMEDIATE_FLOW);\r
452            deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);\r
453         }\r
454     }\r
455 \r
456     /**\r
457      * remove typed flow entry from the internal flow table.\r
458      *\r
459      * @param flowRules the flow entries\r
460      *\r
461      */\r
462     public synchronized void removeFlows(FlowRule...  flowRules) {\r
463         for (FlowRule rule : flowRules) {\r
464             deviceFlowTable.remove(rule);\r
465         }\r
466     }\r
467 \r
468     // same as removeFlows() function\r
469     /**\r
470      * remove typed flow entry from the internal flow table.\r
471      *\r
472      * @param flowRules the flow entries\r
473      *\r
474      */\r
475     public void flowRemoved(FlowRule... flowRules) {\r
476         removeFlows(flowRules);\r
477     }\r
478 \r
479     // same as addOrUpdateFlows() function\r
480     /**\r
481      * add or update typed flow entry from flow entry into the internal flow table.\r
482      *\r
483      * @param flowEntries the flow entry list\r
484      *\r
485      */\r
486     public void pushFlowMetrics(List<FlowEntry> flowEntries) {\r
487         flowEntries.forEach(fe -> {\r
488             addOrUpdateFlows(fe);\r
489         });\r
490     }\r
491 \r
492     /**\r
493      * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).\r
494      *\r
495      * @return xid of missing flow\r
496      */\r
497     public long getFlowMissingXid() {\r
498         return flowMissingXid;\r
499     }\r
500 \r
501     /**\r
502      * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.\r
503      *\r
504      * @param flowMissingXid the OFFlowStatsRequest message Id\r
505      *\r
506      */\r
507     public void setFlowMissingXid(long flowMissingXid) {\r
508         this.flowMissingXid = flowMissingXid;\r
509     }\r
510 \r
511     private class InternalDeviceFlowTable {\r
512 \r
513         private final Map<FlowId, Set<TypedStoredFlowEntry>>\r
514                 flowEntries = Maps.newConcurrentMap();\r
515 \r
516         private final Set<StoredFlowEntry> shortFlows = new HashSet<>();\r
517         private final Set<StoredFlowEntry> midFlows = new HashSet<>();\r
518         private final Set<StoredFlowEntry> longFlows = new HashSet<>();\r
519 \r
520         // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply\r
521         private final long latencyFlowStatsRequestAndReplyMillis = 500;\r
522 \r
523 \r
524         // Statistics for table operation\r
525         private long addCount = 0, addWithSetFlowLiveTypeCount = 0;\r
526         private long removeCount = 0;\r
527 \r
528         /**\r
529          * Resets all count values with zero.\r
530          *\r
531          */\r
532         public void resetAllCount() {\r
533             addCount = 0;\r
534             addWithSetFlowLiveTypeCount = 0;\r
535             removeCount = 0;\r
536         }\r
537 \r
538         // get set of flow entries for the given flowId\r
539         private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {\r
540             return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());\r
541         }\r
542 \r
543         // get flow entry for the given flow rule\r
544         private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {\r
545             Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());\r
546             return flowEntries.stream()\r
547                     .filter(entry -> Objects.equal(entry, rule))\r
548                     .findAny()\r
549                     .orElse(null);\r
550         }\r
551 \r
552         // get the flow entries for all flows in flow table\r
553         private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {\r
554             Set<TypedStoredFlowEntry> result = Sets.newHashSet();\r
555 \r
556             flowEntries.values().forEach(result::addAll);\r
557             return result;\r
558         }\r
559 \r
560         /**\r
561          * Gets the number of flow entry in flow table.\r
562          *\r
563          * @return the number of flow entry.\r
564          *\r
565          */\r
566         public long getFlowCount() {\r
567             return flowEntries.values().stream().mapToLong(Set::size).sum();\r
568         }\r
569 \r
570         /**\r
571          * Gets the number of flow entry in flow table.\r
572          *\r
573          * @param rule the flow rule\r
574          * @return the typed flow entry.\r
575          *\r
576          */\r
577         public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {\r
578             checkNotNull(rule);\r
579 \r
580             return getFlowEntryInternal(rule);\r
581         }\r
582 \r
583         /**\r
584          * Gets the all typed flow entries in flow table.\r
585          *\r
586          * @return the set of typed flow entry.\r
587          *\r
588          */\r
589         public Set<TypedStoredFlowEntry> getFlowEntries() {\r
590             return getFlowEntriesInternal();\r
591         }\r
592 \r
593         /**\r
594          * Gets the short typed flow entries in flow table.\r
595          *\r
596          * @return the set of typed flow entry.\r
597          *\r
598          */\r
599         public Set<StoredFlowEntry> getShortFlows() {\r
600             return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);\r
601         }\r
602 \r
603         /**\r
604          * Gets the mid typed flow entries in flow table.\r
605          *\r
606          * @return the set of typed flow entry.\r
607          *\r
608          */\r
609         public Set<StoredFlowEntry> getMidFlows() {\r
610             return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);\r
611         }\r
612 \r
613         /**\r
614          * Gets the long typed flow entries in flow table.\r
615          *\r
616          * @return the set of typed flow entry.\r
617          *\r
618          */\r
619         public Set<StoredFlowEntry> getLongFlows() {\r
620             return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);\r
621         }\r
622 \r
623         /**\r
624          * Add typed flow entry into table only.\r
625          *\r
626          * @param rule the flow rule\r
627          *\r
628          */\r
629         public synchronized void add(TypedStoredFlowEntry rule) {\r
630             checkNotNull(rule);\r
631 \r
632             //rule have to be new DefaultTypedFlowEntry\r
633             boolean result = getFlowEntriesInternal(rule.id()).add(rule);\r
634 \r
635             if (result) {\r
636                 addCount++;\r
637             }\r
638         }\r
639 \r
640         /**\r
641          * Calculates and set the flow live type at the first time,\r
642          * and then add it into a corresponding typed flow table.\r
643          *\r
644          * @param rule the flow rule\r
645          *\r
646          */\r
647         public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {\r
648             checkNotNull(rule);\r
649 \r
650             calAndSetFlowLiveTypeInternal(rule);\r
651         }\r
652 \r
653         /**\r
654          * Add the typed flow entry into table, and calculates and set the flow live type,\r
655          * and then add it into a corresponding typed flow table.\r
656          *\r
657          * @param rule the flow rule\r
658          *\r
659          */\r
660        public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {\r
661             checkNotNull(rule);\r
662 \r
663             //rule have to be new DefaultTypedFlowEntry\r
664             boolean result = getFlowEntriesInternal(rule.id()).add(rule);\r
665             if (result) {\r
666                 calAndSetFlowLiveTypeInternal(rule);\r
667                 addWithSetFlowLiveTypeCount++;\r
668             } else {\r
669                 log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())\r
670                                 + " ADD Failed, cause it may already exists in table !!!,"\r
671                                 + " AdaptiveStats collection thread for {}",\r
672                         sw.getStringId());\r
673             }\r
674         }\r
675 \r
676         // In real, calculates and set the flow live type at the first time,\r
677         // and then add it into a corresponding typed flow table\r
678         private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {\r
679             long life = rule.life();\r
680             FlowLiveType prevFlowLiveType = rule.flowLiveType();\r
681 \r
682             if (life >= longPollInterval) {\r
683                 rule.setFlowLiveType(FlowLiveType.LONG_FLOW);\r
684                 longFlows.add(rule);\r
685             } else if (life >= midPollInterval) {\r
686                 rule.setFlowLiveType(FlowLiveType.MID_FLOW);\r
687                 midFlows.add(rule);\r
688             } else if (life >= calAndPollInterval) {\r
689                 rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);\r
690                 shortFlows.add(rule);\r
691             } else if (life >= 0) {\r
692                 rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);\r
693             } else { // life < 0\r
694                 rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);\r
695             }\r
696 \r
697             if (rule.flowLiveType() != prevFlowLiveType) {\r
698                 switch (prevFlowLiveType) {\r
699                     // delete it from previous flow table\r
700                     case SHORT_FLOW:\r
701                         shortFlows.remove(rule);\r
702                         break;\r
703                     case MID_FLOW:\r
704                         midFlows.remove(rule);\r
705                         break;\r
706                     case LONG_FLOW:\r
707                         longFlows.remove(rule);\r
708                         break;\r
709                     default:\r
710                         break;\r
711                 }\r
712             }\r
713         }\r
714 \r
715 \r
716         // check the flow live type based on current time, then set and add it into corresponding table\r
717         private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {\r
718             long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());\r
719             // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply\r
720             long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);\r
721             // fe.life() unit is SECOND!\r
722             long liveTime = fe.life() + fromLastSeen;\r
723 \r
724 \r
725             switch (fe.flowLiveType()) {\r
726                 case IMMEDIATE_FLOW:\r
727                     if (liveTime >= longPollInterval) {\r
728                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);\r
729                          longFlows.add(fe);\r
730                     } else if (liveTime >= midPollInterval) {\r
731                         fe.setFlowLiveType(FlowLiveType.MID_FLOW);\r
732                         midFlows.add(fe);\r
733                     } else if (liveTime >= calAndPollInterval) {\r
734                         fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);\r
735                         shortFlows.add(fe);\r
736                     }\r
737                     break;\r
738                 case SHORT_FLOW:\r
739                     if (liveTime >= longPollInterval) {\r
740                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);\r
741                         shortFlows.remove(fe);\r
742                         longFlows.add(fe);\r
743                     } else if (liveTime >= midPollInterval) {\r
744                         fe.setFlowLiveType(FlowLiveType.MID_FLOW);\r
745                         shortFlows.remove(fe);\r
746                         midFlows.add(fe);\r
747                     }\r
748                     break;\r
749                 case MID_FLOW:\r
750                     if (liveTime >= longPollInterval) {\r
751                         fe.setFlowLiveType(FlowLiveType.LONG_FLOW);\r
752                         midFlows.remove(fe);\r
753                         longFlows.add(fe);\r
754                     }\r
755                     break;\r
756                 case LONG_FLOW:\r
757                     if (fromLastSeen > entirePollInterval) {\r
758                         log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");\r
759                         return false;\r
760                     }\r
761                     break;\r
762                 case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through\r
763                 default :\r
764                     // Error Unknown Live Type\r
765                     log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"\r
766                             + "AdaptiveStats collection thread for {}",\r
767                             sw.getStringId());\r
768                     return false;\r
769             }\r
770 \r
771             log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())\r
772                             + ", state=" + fe.state()\r
773                             + ", After liveType=" + fe.flowLiveType()\r
774                             + ", liveTime=" + liveTime\r
775                             + ", life=" + fe.life()\r
776                             + ", bytes=" + fe.bytes()\r
777                             + ", packets=" + fe.packets()\r
778                             + ", fromLastSeen=" + fromLastSeen\r
779                             + ", priority=" + fe.priority()\r
780                             + ", selector=" + fe.selector().criteria()\r
781                             + ", treatment=" + fe.treatment()\r
782                             + " AdaptiveStats collection thread for {}",\r
783                     sw.getStringId());\r
784 \r
785             return true;\r
786         }\r
787 \r
788         /**\r
789          * Check and move live type for all type flow entries in table at every calAndPollInterval time.\r
790          *\r
791          */\r
792         public void checkAndMoveLiveFlowAll() {\r
793             Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();\r
794 \r
795             long calCurTime = System.currentTimeMillis();\r
796             typedFlowEntries.forEach(fe -> {\r
797                 if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {\r
798                     remove(fe);\r
799                 }\r
800             });\r
801 \r
802             // print table counts for debug\r
803             if (log.isDebugEnabled()) {\r
804                 synchronized (this) {\r
805                     long totalFlowCount = getFlowCount();\r
806                     long shortFlowCount = shortFlows.size();\r
807                     long midFlowCount = midFlows.size();\r
808                     long longFlowCount = longFlows.size();\r
809                     long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;\r
810                     long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;\r
811 \r
812                     log.debug("--------------------------------------------------------------------------- for {}",\r
813                             sw.getStringId());\r
814                     log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount\r
815                             + ", add - remove_Count=" + calTotalCount\r
816                             + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount\r
817                             + ", SHORT_FLOW_Count=" + shortFlowCount\r
818                             + ", MID_FLOW_Count=" + midFlowCount\r
819                             + ", LONG_FLOW_Count=" + longFlowCount\r
820                             + ", add_Count=" + addCount\r
821                             + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount\r
822                             + ", remove_Count=" + removeCount\r
823                             + " AdaptiveStats collection thread for {}", sw.getStringId());\r
824                     log.debug("--------------------------------------------------------------------------- for {}",\r
825                             sw.getStringId());\r
826                     if (totalFlowCount != calTotalCount) {\r
827                         log.error("checkAndMoveLiveFlowAll, Real total flow count and "\r
828                                 + "calculated total flow count do NOT match, something is wrong internally "\r
829                                 + "or check counter value bound is over!");\r
830                     }\r
831                     if (immediateFlowCount < 0) {\r
832                         log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "\r
833                                 + "something is wrong internally "\r
834                                 + "or check counter value bound is over!");\r
835                     }\r
836                 }\r
837             }\r
838             log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());\r
839         }\r
840 \r
841         /**\r
842          * Remove the typed flow entry from table.\r
843          *\r
844          * @param rule the flow rule\r
845          *\r
846          */\r
847         public synchronized void remove(FlowRule rule) {\r
848             checkNotNull(rule);\r
849 \r
850             TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);\r
851             if (removeStore != null) {\r
852                 removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);\r
853                 boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);\r
854 \r
855                 if (result) {\r
856                     removeCount++;\r
857                 }\r
858             }\r
859        }\r
860 \r
861         // Remove the typed flow entry from corresponding table\r
862         private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {\r
863             switch (fe.flowLiveType()) {\r
864                 case IMMEDIATE_FLOW:\r
865                     // do nothing\r
866                     break;\r
867                 case SHORT_FLOW:\r
868                     shortFlows.remove(fe);\r
869                     break;\r
870                 case MID_FLOW:\r
871                     midFlows.remove(fe);\r
872                     break;\r
873                 case LONG_FLOW:\r
874                     longFlows.remove(fe);\r
875                     break;\r
876                 default: // error in Flow Live Type\r
877                     log.error("removeLiveFlowsInternal, Unknown Live Type error!");\r
878                     break;\r
879             }\r
880         }\r
881     }\r
882 }\r