32890cb1680ee84cd71e6302afbb182d2e5da238
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.incubator.store.meter.impl;
17
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.Maps;
20 import org.apache.felix.scr.annotations.Activate;
21 import org.apache.felix.scr.annotations.Component;
22 import org.apache.felix.scr.annotations.Deactivate;
23 import org.apache.felix.scr.annotations.Reference;
24 import org.apache.felix.scr.annotations.ReferenceCardinality;
25 import org.apache.felix.scr.annotations.Service;
26 import org.onosproject.cluster.ClusterService;
27 import org.onosproject.cluster.NodeId;
28 import org.onosproject.mastership.MastershipService;
29 import org.onosproject.net.meter.Band;
30 import org.onosproject.net.meter.DefaultBand;
31 import org.onosproject.net.meter.DefaultMeter;
32 import org.onosproject.net.meter.Meter;
33 import org.onosproject.net.meter.MeterEvent;
34 import org.onosproject.net.meter.MeterFailReason;
35 import org.onosproject.net.meter.MeterId;
36 import org.onosproject.net.meter.MeterOperation;
37 import org.onosproject.net.meter.MeterState;
38 import org.onosproject.net.meter.MeterStore;
39 import org.onosproject.net.meter.MeterStoreDelegate;
40 import org.onosproject.net.meter.MeterStoreResult;
41 import org.onosproject.store.AbstractStore;
42 import org.onosproject.store.serializers.KryoNamespaces;
43 import org.onosproject.store.service.ConsistentMap;
44 import org.onosproject.store.service.MapEvent;
45 import org.onosproject.store.service.MapEventListener;
46 import org.onosproject.store.service.Serializer;
47 import org.onosproject.store.service.StorageException;
48 import org.onosproject.store.service.StorageService;
49 import org.onosproject.store.service.Versioned;
50 import org.slf4j.Logger;
51
52 import java.util.Arrays;
53 import java.util.Collection;
54 import java.util.Map;
55 import java.util.concurrent.CompletableFuture;
56
57 import static org.slf4j.LoggerFactory.getLogger;
58
59 /**
60  * A distributed meter store implementation. Meters are stored consistently
61  * across the cluster.
62  */
63 @Component(immediate = true)
64 @Service
65 public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
66                     implements MeterStore {
67
68     private Logger log = getLogger(getClass());
69
70     private static final String METERSTORE = "onos-meter-store";
71
72     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73     private StorageService storageService;
74
75     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
76     private MastershipService mastershipService;
77
78     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
79     private ClusterService clusterService;
80
81     private ConsistentMap<MeterId, MeterData> meters;
82     private NodeId local;
83
84     private MapEventListener mapListener = new InternalMapEventListener();
85
86     private Map<MeterId, CompletableFuture<MeterStoreResult>> futures =
87             Maps.newConcurrentMap();
88
89     @Activate
90     public void activate() {
91
92         local = clusterService.getLocalNode().id();
93
94
95         meters = storageService.<MeterId, MeterData>consistentMapBuilder()
96                     .withName(METERSTORE)
97                     .withSerializer(Serializer.using(Arrays.asList(KryoNamespaces.API),
98                                                      MeterData.class,
99                                                      DefaultMeter.class,
100                                                      DefaultBand.class,
101                                                      Band.Type.class,
102                                                      MeterState.class,
103                                                      Meter.Unit.class,
104                                                      MeterFailReason.class,
105                                                      MeterId.class)).build();
106
107         meters.addListener(mapListener);
108
109         log.info("Started");
110     }
111
112     @Deactivate
113     public void deactivate() {
114
115         meters.removeListener(mapListener);
116         log.info("Stopped");
117     }
118
119
120     @Override
121     public CompletableFuture<MeterStoreResult> storeMeter(Meter meter) {
122         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
123         futures.put(meter.id(), future);
124         MeterData data = new MeterData(meter, null, local);
125
126         try {
127             meters.put(meter.id(), data);
128         } catch (StorageException e) {
129             future.completeExceptionally(e);
130         }
131
132         return future;
133
134     }
135
136     @Override
137     public CompletableFuture<MeterStoreResult> deleteMeter(Meter meter) {
138         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
139         futures.put(meter.id(), future);
140
141         MeterData data = new MeterData(meter, null, local);
142
143         // update the state of the meter. It will be pruned by observing
144         // that it has been removed from the dataplane.
145         try {
146             if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
147                 future.complete(MeterStoreResult.success());
148             }
149         } catch (StorageException e) {
150             future.completeExceptionally(e);
151         }
152
153
154         return future;
155     }
156
157     @Override
158     public CompletableFuture<MeterStoreResult> updateMeter(Meter meter) {
159         CompletableFuture<MeterStoreResult> future = new CompletableFuture<>();
160         futures.put(meter.id(), future);
161
162         MeterData data = new MeterData(meter, null, local);
163         try {
164             if (meters.computeIfPresent(meter.id(), (k, v) -> data) == null) {
165                 future.complete(MeterStoreResult.fail(MeterFailReason.INVALID_METER));
166             }
167         } catch (StorageException e) {
168             future.completeExceptionally(e);
169         }
170         return future;
171     }
172
173     @Override
174     public void updateMeterState(Meter meter) {
175         meters.computeIfPresent(meter.id(), (id, v) -> {
176             DefaultMeter m = (DefaultMeter) v.meter();
177             m.setState(meter.state());
178             m.setProcessedPackets(meter.packetsSeen());
179             m.setProcessedBytes(meter.bytesSeen());
180             m.setLife(meter.life());
181             // TODO: Prune if drops to zero.
182             m.setReferenceCount(meter.referenceCount());
183             return new MeterData(m, null, v.origin());
184         });
185     }
186
187     @Override
188     public Meter getMeter(MeterId meterId) {
189         MeterData data = Versioned.valueOrElse(meters.get(meterId), null);
190         return data == null ? null : data.meter();
191     }
192
193     @Override
194     public Collection<Meter> getAllMeters() {
195         return Collections2.transform(meters.asJavaMap().values(),
196                                       MeterData::meter);
197     }
198
199     @Override
200     public void failedMeter(MeterOperation op, MeterFailReason reason) {
201         meters.computeIfPresent(op.meter().id(), (k, v) ->
202                 new MeterData(v.meter(), reason, v.origin()));
203     }
204
205     @Override
206     public void deleteMeterNow(Meter m) {
207         futures.remove(m.id());
208         meters.remove(m.id());
209     }
210
211     private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
212         @Override
213         public void event(MapEvent<MeterId, MeterData> event) {
214             MeterData data = event.value().value();
215             NodeId master = mastershipService.getMasterFor(data.meter().deviceId());
216             switch (event.type()) {
217                 case INSERT:
218                 case UPDATE:
219                         switch (data.meter().state()) {
220                             case PENDING_ADD:
221                             case PENDING_REMOVE:
222                                 if (!data.reason().isPresent() && local.equals(master)) {
223                                     notifyDelegate(
224                                             new MeterEvent(data.meter().state() == MeterState.PENDING_ADD ?
225                                                     MeterEvent.Type.METER_ADD_REQ : MeterEvent.Type.METER_REM_REQ,
226                                                                   data.meter()));
227                                 } else if (data.reason().isPresent() && local.equals(data.origin())) {
228                                     MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
229                                     //TODO: No future -> no friend
230                                     futures.get(data.meter().id()).complete(msr);
231                                 }
232                                 break;
233                             case ADDED:
234                                 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_ADD) {
235                                     futures.remove(data.meter().id()).complete(MeterStoreResult.success());
236                                 }
237                                 break;
238                             case REMOVED:
239                                 if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
240                                     futures.remove(data.meter().id()).complete(MeterStoreResult.success());
241                                 }
242                                 break;
243                             default:
244                                 log.warn("Unknown meter state type {}", data.meter().state());
245                         }
246                     break;
247                 case REMOVE:
248                     //Only happens at origin so we do not need to care.
249                     break;
250                 default:
251                     log.warn("Unknown Map event type {}", event.type());
252             }
253
254         }
255     }
256
257
258 }