2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.statistic.impl;
18 import com.google.common.collect.Sets;
20 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate;
23 import org.apache.felix.scr.annotations.Reference;
24 import org.apache.felix.scr.annotations.ReferenceCardinality;
25 import org.apache.felix.scr.annotations.Service;
26 import org.onlab.util.KryoNamespace;
27 import org.onlab.util.Tools;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.NodeId;
30 import org.onosproject.mastership.MastershipService;
31 import org.onosproject.net.ConnectPoint;
32 import org.onosproject.net.DeviceId;
33 import org.onosproject.net.PortNumber;
34 import org.onosproject.net.flow.FlowEntry;
35 import org.onosproject.net.flow.FlowRule;
36 import org.onosproject.net.flow.instructions.Instruction;
37 import org.onosproject.net.flow.instructions.Instructions;
38 import org.onosproject.net.statistic.StatisticStore;
39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40 import org.onosproject.store.serializers.KryoNamespaces;
41 import org.onosproject.store.serializers.KryoSerializer;
42 import org.slf4j.Logger;
44 import java.util.Collections;
45 import java.util.HashSet;
48 import java.util.concurrent.ConcurrentHashMap;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicInteger;
54 import static org.onlab.util.Tools.groupedThreads;
55 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
56 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
57 import static org.slf4j.LoggerFactory.getLogger;
61 * Maintains statistics using RPC calls to collect stats from remote instances
64 @Component(immediate = true)
66 public class DistributedStatisticStore implements StatisticStore {
68 private final Logger log = getLogger(getClass());
70 // TODO: Make configurable.
71 private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74 protected MastershipService mastershipService;
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected ClusterCommunicationService clusterCommunicator;
79 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80 protected ClusterService clusterService;
82 private Map<ConnectPoint, InternalStatisticRepresentation> representations =
83 new ConcurrentHashMap<>();
85 private Map<ConnectPoint, Set<FlowEntry>> previous =
86 new ConcurrentHashMap<>();
88 private Map<ConnectPoint, Set<FlowEntry>> current =
89 new ConcurrentHashMap<>();
91 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
93 protected void setupKryoPool() {
94 serializerPool = KryoNamespace.newBuilder()
95 .register(KryoNamespaces.API)
96 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
97 // register this store specific classes here
102 private ExecutorService messageHandlingExecutor;
104 private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
107 public void activate() {
109 messageHandlingExecutor = Executors.newFixedThreadPool(
110 MESSAGE_HANDLER_THREAD_POOL_SIZE,
111 groupedThreads("onos/store/statistic", "message-handlers"));
113 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
115 this::getCurrentStatisticInternal,
117 messageHandlingExecutor);
119 clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
121 this::getPreviousStatisticInternal,
123 messageHandlingExecutor);
129 public void deactivate() {
130 clusterCommunicator.removeSubscriber(GET_PREVIOUS);
131 clusterCommunicator.removeSubscriber(GET_CURRENT);
132 messageHandlingExecutor.shutdown();
137 public void prepareForStatistics(FlowRule rule) {
138 ConnectPoint cp = buildConnectPoint(rule);
142 InternalStatisticRepresentation rep;
143 synchronized (representations) {
144 rep = getOrCreateRepresentation(cp);
150 public synchronized void removeFromStatistics(FlowRule rule) {
151 ConnectPoint cp = buildConnectPoint(rule);
155 InternalStatisticRepresentation rep = representations.get(cp);
156 if (rep != null && rep.remove(rule)) {
157 updatePublishedStats(cp, Collections.emptySet());
159 Set<FlowEntry> values = current.get(cp);
160 if (values != null) {
163 values = previous.get(cp);
164 if (values != null) {
171 public void addOrUpdateStatistic(FlowEntry rule) {
172 ConnectPoint cp = buildConnectPoint(rule);
176 InternalStatisticRepresentation rep = representations.get(cp);
177 if (rep != null && rep.submit(rule)) {
178 updatePublishedStats(cp, rep.get());
182 private synchronized void updatePublishedStats(ConnectPoint cp,
183 Set<FlowEntry> flowEntries) {
184 Set<FlowEntry> curr = current.get(cp);
186 curr = new HashSet<>();
188 previous.put(cp, curr);
189 current.put(cp, flowEntries);
194 public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
195 final DeviceId deviceId = connectPoint.deviceId();
196 NodeId master = mastershipService.getMasterFor(deviceId);
197 if (master == null) {
198 log.warn("No master for {}", deviceId);
199 return Collections.emptySet();
201 if (master.equals(clusterService.getLocalNode().id())) {
202 return getCurrentStatisticInternal(connectPoint);
204 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
210 STATISTIC_STORE_TIMEOUT_MILLIS,
211 TimeUnit.MILLISECONDS,
212 Collections.emptySet());
217 private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
218 return current.get(connectPoint);
222 public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
223 final DeviceId deviceId = connectPoint.deviceId();
224 NodeId master = mastershipService.getMasterFor(deviceId);
225 if (master == null) {
226 log.warn("No master for {}", deviceId);
227 return Collections.emptySet();
229 if (master.equals(clusterService.getLocalNode().id())) {
230 return getPreviousStatisticInternal(connectPoint);
232 return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
238 STATISTIC_STORE_TIMEOUT_MILLIS,
239 TimeUnit.MILLISECONDS,
240 Collections.emptySet());
244 private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
245 return previous.get(connectPoint);
248 private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
250 if (representations.containsKey(cp)) {
251 return representations.get(cp);
253 InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
254 representations.put(cp, rep);
260 private ConnectPoint buildConnectPoint(FlowRule rule) {
261 PortNumber port = getOutput(rule);
266 ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
270 private PortNumber getOutput(FlowRule rule) {
271 for (Instruction i : rule.treatment().allInstructions()) {
272 if (i.type() == Instruction.Type.OUTPUT) {
273 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
276 if (i.type() == Instruction.Type.DROP) {
277 return PortNumber.P0;
283 private class InternalStatisticRepresentation {
285 private final AtomicInteger counter = new AtomicInteger(0);
286 private final Set<FlowEntry> rules = new HashSet<>();
288 public void prepare() {
289 counter.incrementAndGet();
292 public synchronized boolean remove(FlowRule rule) {
294 return counter.decrementAndGet() == 0;
297 public synchronized boolean submit(FlowEntry rule) {
298 if (rules.contains(rule)) {
302 if (counter.get() == 0) {
305 return counter.decrementAndGet() == 0;
309 public synchronized Set<FlowEntry> get() {
310 counter.set(rules.size());
311 return Sets.newHashSet(rules);