d54347309b50d3c608f571e558b50ac31fcecd3d
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.statistic.impl;
17
18 import com.google.common.collect.Sets;
19
20 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate;
23 import org.apache.felix.scr.annotations.Reference;
24 import org.apache.felix.scr.annotations.ReferenceCardinality;
25 import org.apache.felix.scr.annotations.Service;
26 import org.onlab.util.KryoNamespace;
27 import org.onlab.util.Tools;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.NodeId;
30 import org.onosproject.mastership.MastershipService;
31 import org.onosproject.net.ConnectPoint;
32 import org.onosproject.net.DeviceId;
33 import org.onosproject.net.PortNumber;
34 import org.onosproject.net.flow.FlowEntry;
35 import org.onosproject.net.flow.FlowRule;
36 import org.onosproject.net.flow.instructions.Instruction;
37 import org.onosproject.net.flow.instructions.Instructions;
38 import org.onosproject.net.statistic.StatisticStore;
39 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
40 import org.onosproject.store.serializers.KryoNamespaces;
41 import org.onosproject.store.serializers.KryoSerializer;
42 import org.slf4j.Logger;
43
44 import java.util.Collections;
45 import java.util.HashSet;
46 import java.util.Map;
47 import java.util.Set;
48 import java.util.concurrent.ConcurrentHashMap;
49 import java.util.concurrent.ExecutorService;
50 import java.util.concurrent.Executors;
51 import java.util.concurrent.TimeUnit;
52 import java.util.concurrent.atomic.AtomicInteger;
53
54 import static org.onlab.util.Tools.groupedThreads;
55 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
56 import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
57 import static org.slf4j.LoggerFactory.getLogger;
58
59
60 /**
61  * Maintains statistics using RPC calls to collect stats from remote instances
62  * on demand.
63  */
64 @Component(immediate = true)
65 @Service
66 public class DistributedStatisticStore implements StatisticStore {
67
68     private final Logger log = getLogger(getClass());
69
70     // TODO: Make configurable.
71     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
72
73     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
74     protected MastershipService mastershipService;
75
76     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77     protected ClusterCommunicationService clusterCommunicator;
78
79     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
80     protected ClusterService clusterService;
81
82     private Map<ConnectPoint, InternalStatisticRepresentation> representations =
83             new ConcurrentHashMap<>();
84
85     private Map<ConnectPoint, Set<FlowEntry>> previous =
86             new ConcurrentHashMap<>();
87
88     private Map<ConnectPoint, Set<FlowEntry>> current =
89             new ConcurrentHashMap<>();
90
91     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
92         @Override
93         protected void setupKryoPool() {
94             serializerPool = KryoNamespace.newBuilder()
95                     .register(KryoNamespaces.API)
96                     .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
97                     // register this store specific classes here
98                     .build();
99         }
100     };;
101
102     private ExecutorService messageHandlingExecutor;
103
104     private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
105
106     @Activate
107     public void activate() {
108
109         messageHandlingExecutor = Executors.newFixedThreadPool(
110                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
111                 groupedThreads("onos/store/statistic", "message-handlers"));
112
113         clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_CURRENT,
114                 SERIALIZER::decode,
115                 this::getCurrentStatisticInternal,
116                 SERIALIZER::encode,
117                 messageHandlingExecutor);
118
119         clusterCommunicator.<ConnectPoint, Set<FlowEntry>>addSubscriber(GET_PREVIOUS,
120                 SERIALIZER::decode,
121                 this::getPreviousStatisticInternal,
122                 SERIALIZER::encode,
123                 messageHandlingExecutor);
124
125         log.info("Started");
126     }
127
128     @Deactivate
129     public void deactivate() {
130         clusterCommunicator.removeSubscriber(GET_PREVIOUS);
131         clusterCommunicator.removeSubscriber(GET_CURRENT);
132         messageHandlingExecutor.shutdown();
133         log.info("Stopped");
134     }
135
136     @Override
137     public void prepareForStatistics(FlowRule rule) {
138         ConnectPoint cp = buildConnectPoint(rule);
139         if (cp == null) {
140             return;
141         }
142         InternalStatisticRepresentation rep;
143         synchronized (representations) {
144             rep = getOrCreateRepresentation(cp);
145         }
146         rep.prepare();
147     }
148
149     @Override
150     public synchronized void removeFromStatistics(FlowRule rule) {
151         ConnectPoint cp = buildConnectPoint(rule);
152         if (cp == null) {
153             return;
154         }
155         InternalStatisticRepresentation rep = representations.get(cp);
156         if (rep != null && rep.remove(rule)) {
157             updatePublishedStats(cp, Collections.emptySet());
158         }
159         Set<FlowEntry> values = current.get(cp);
160         if (values != null) {
161             values.remove(rule);
162         }
163         values = previous.get(cp);
164         if (values != null) {
165             values.remove(rule);
166         }
167
168     }
169
170     @Override
171     public void addOrUpdateStatistic(FlowEntry rule) {
172         ConnectPoint cp = buildConnectPoint(rule);
173         if (cp == null) {
174             return;
175         }
176         InternalStatisticRepresentation rep = representations.get(cp);
177         if (rep != null && rep.submit(rule)) {
178             updatePublishedStats(cp, rep.get());
179         }
180     }
181
182     private synchronized void updatePublishedStats(ConnectPoint cp,
183                                                    Set<FlowEntry> flowEntries) {
184         Set<FlowEntry> curr = current.get(cp);
185         if (curr == null) {
186             curr = new HashSet<>();
187         }
188         previous.put(cp, curr);
189         current.put(cp, flowEntries);
190
191     }
192
193     @Override
194     public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
195         final DeviceId deviceId = connectPoint.deviceId();
196         NodeId master = mastershipService.getMasterFor(deviceId);
197         if (master == null) {
198             log.warn("No master for {}", deviceId);
199             return Collections.emptySet();
200         }
201         if (master.equals(clusterService.getLocalNode().id())) {
202             return getCurrentStatisticInternal(connectPoint);
203         } else {
204             return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
205                                         connectPoint,
206                                         GET_CURRENT,
207                                         SERIALIZER::encode,
208                                         SERIALIZER::decode,
209                                         master),
210                                    STATISTIC_STORE_TIMEOUT_MILLIS,
211                                    TimeUnit.MILLISECONDS,
212                                    Collections.emptySet());
213         }
214
215     }
216
217     private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
218         return current.get(connectPoint);
219     }
220
221     @Override
222     public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
223         final DeviceId deviceId = connectPoint.deviceId();
224         NodeId master = mastershipService.getMasterFor(deviceId);
225         if (master == null) {
226             log.warn("No master for {}", deviceId);
227             return Collections.emptySet();
228         }
229         if (master.equals(clusterService.getLocalNode().id())) {
230             return getPreviousStatisticInternal(connectPoint);
231         } else {
232             return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
233                                         connectPoint,
234                                         GET_PREVIOUS,
235                                         SERIALIZER::encode,
236                                         SERIALIZER::decode,
237                                         master),
238                                    STATISTIC_STORE_TIMEOUT_MILLIS,
239                                    TimeUnit.MILLISECONDS,
240                                    Collections.emptySet());
241         }
242     }
243
244     private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
245         return previous.get(connectPoint);
246     }
247
248     private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
249
250         if (representations.containsKey(cp)) {
251             return representations.get(cp);
252         } else {
253             InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
254             representations.put(cp, rep);
255             return rep;
256         }
257
258     }
259
260     private ConnectPoint buildConnectPoint(FlowRule rule) {
261         PortNumber port = getOutput(rule);
262
263         if (port == null) {
264             return null;
265         }
266         ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
267         return cp;
268     }
269
270     private PortNumber getOutput(FlowRule rule) {
271         for (Instruction i : rule.treatment().allInstructions()) {
272             if (i.type() == Instruction.Type.OUTPUT) {
273                 Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
274                 return out.port();
275             }
276             if (i.type() == Instruction.Type.DROP) {
277                 return PortNumber.P0;
278             }
279         }
280         return null;
281     }
282
283     private class InternalStatisticRepresentation {
284
285         private final AtomicInteger counter = new AtomicInteger(0);
286         private final Set<FlowEntry> rules = new HashSet<>();
287
288         public void prepare() {
289             counter.incrementAndGet();
290         }
291
292         public synchronized boolean remove(FlowRule rule) {
293             rules.remove(rule);
294             return counter.decrementAndGet() == 0;
295         }
296
297         public synchronized boolean submit(FlowEntry rule) {
298             if (rules.contains(rule)) {
299                 rules.remove(rule);
300             }
301             rules.add(rule);
302             if (counter.get() == 0) {
303                 return true;
304             } else {
305                 return counter.decrementAndGet() == 0;
306             }
307         }
308
309         public synchronized Set<FlowEntry> get() {
310             counter.set(rules.size());
311             return Sets.newHashSet(rules);
312         }
313
314
315     }
316
317 }