2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.incubator.store.meter.impl;
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.Maps;
20 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate;
23 import org.apache.felix.scr.annotations.Reference;
24 import org.apache.felix.scr.annotations.ReferenceCardinality;
25 import org.apache.felix.scr.annotations.Service;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.mastership.MastershipService;
29 import org.onosproject.net.meter.Band;
30 import org.onosproject.net.meter.DefaultBand;
31 import org.onosproject.net.meter.DefaultMeter;
32 import org.onosproject.net.meter.Meter;
33 import org.onosproject.net.meter.MeterEvent;
34 import org.onosproject.net.meter.MeterFailReason;
35 import org.onosproject.net.meter.MeterId;
36 import org.onosproject.net.meter.MeterOperation;
37 import org.onosproject.net.meter.MeterState;
38 import org.onosproject.net.meter.MeterStore;
39 import org.onosproject.net.meter.MeterStoreDelegate;
40 import org.onosproject.net.meter.MeterStoreResult;
41 import org.onosproject.store.AbstractStore;
42 import org.onosproject.store.serializers.KryoNamespaces;
43 import org.onosproject.store.service.ConsistentMap;
44 import org.onosproject.store.service.MapEvent;
45 import org.onosproject.store.service.MapEventListener;
46 import org.onosproject.store.service.Serializer;
47 import org.onosproject.store.service.StorageException;
48 import org.onosproject.store.service.StorageService;
49 import org.onosproject.store.service.Versioned;
50 import org.slf4j.Logger;
52 import java.util.Arrays;
53 import java.util.Collection;
55 import java.util.concurrent.CompletableFuture;
57 import static org.slf4j.LoggerFactory.getLogger;
60 * A distributed meter store implementation. Meters are stored consistently
63 @Component(immediate = true)
65 public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
66 implements MeterStore {
68 private Logger log = getLogger(getClass());
70 private static final String METERSTORE = "onos-meter-store";
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 private StorageService storageService;
75 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76 private MastershipService mastershipService;
78 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79 private ClusterService clusterService;
81 private ConsistentMap<MeterId, MeterData> meters;
84 private MapEventListener mapListener = new InternalMapEventListener();
86 private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
87 Maps.newConcurrentMap();
90 public void activate() {
92 local = clusterService.getLocalNode().id();
95 meters = storageService.<MeterId, MeterData>consistentMapBuilder()
97 .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
104 MeterFailReason.class,
105 MeterId.class)).build();
107 meters.addListener(mapListener);
113 public void deactivate() {
115 meters.removeListener(mapListener);
121 public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
122 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
123 futures.put(meter.id(), future);
124 MeterData data = new MeterData(meter, null, local);
127 meters.put(meter.id(), data);
128 } catch (StorageException e) {
129 future.completeExceptionally(e);
137 public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
138 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
139 futures.put(meter.id(), future);
141 MeterData data = new MeterData(meter, null, local);
143 // update the state of the meter. It will be pruned by observing
144 // that it has been removed from the dataplane.
146 if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
147 future.complete(MeterStoreResult.success());
149 } catch (StorageException e) {
150 future.completeExceptionally(e);
158 public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
159 CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
160 futures.put(meter.id(), future);
162 MeterData data = new MeterData(meter, null, local);
164 if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
165 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
167 } catch (StorageException e) {
168 future.completeExceptionally(e);
174 public void updateMeterState(Meter meter) {
175 meters.computeIfPresent(meter.id(), (id, v) -> {
176 DefaultMeter m = (DefaultMeter) v.meter();
177 m.setState(meter.state());
178 m.setProcessedPackets(meter.packetsSeen());
179 m.setProcessedBytes(meter.bytesSeen());
180 m.setLife(meter.life());
181 // TODO: Prune if drops to zero.
182 m.setReferenceCount(meter.referenceCount());
183 return new MeterData(m, null, v.origin());
188 public Meter getMeter(MeterId meterId) {
189 MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
190 return data == null ? null : data.meter();
194 public Collection<Meter> getAllMeters() {
195 return Collections2.transform(meters.asJavaMap().values(),
200 public void failedMeter(MeterOperation op, MeterFailReason reason) {
201 meters.computeIfPresent(op.meter().id(), (k, v) ->
202 new MeterData(v.meter(), reason, v.origin()));
206 public void deleteMeterNow(Meter m) {
207 futures.remove(m.id());
208 meters.remove(m.id());
211 private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
213 public void event(MapEvent<MeterId, MeterData> event) {
214 MeterData data = event.value().value();
215 NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
216 switch (event.type()) {
219 switch (data.meter().state()) {
222 if (!data.reason().isPresent() && local.equals(master)) {
224 new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
225 MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
227 } else if (data.reason().isPresent() && local.equals(data.origin())) {
228 MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
229 //TODO: No future -> no friend
230 futures.get(data.meter().id()).complete(msr);
234 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
235 futures.remove(data.meter().id()).complete(MeterStoreResult.success());
239 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
240 futures.remove(data.meter().id()).complete(MeterStoreResult.success());
244 log.warn("Unknown meter state type {}", data.meter().state());
248 //Only happens at origin so we do not need to care.
251 log.warn("Unknown Map event type {}", event.type());