2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.incubator.net.meter.impl;
18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.util.TriConsumer;
25 import org.onosproject.net.meter.DefaultMeter;
26 import org.onosproject.net.meter.Meter;
27 import org.onosproject.net.meter.MeterEvent;
28 import org.onosproject.net.meter.MeterFailReason;
29 import org.onosproject.net.meter.MeterId;
30 import org.onosproject.net.meter.MeterListener;
31 import org.onosproject.net.meter.MeterOperation;
32 import org.onosproject.net.meter.MeterProvider;
33 import org.onosproject.net.meter.MeterProviderRegistry;
34 import org.onosproject.net.meter.MeterProviderService;
35 import org.onosproject.net.meter.MeterRequest;
36 import org.onosproject.net.meter.MeterService;
37 import org.onosproject.net.meter.MeterState;
38 import org.onosproject.net.meter.MeterStore;
39 import org.onosproject.net.meter.MeterStoreDelegate;
40 import org.onosproject.net.meter.MeterStoreResult;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
43 import org.onosproject.net.provider.AbstractProviderService;
44 import org.onosproject.store.service.AtomicCounter;
45 import org.onosproject.store.service.StorageService;
46 import org.slf4j.Logger;
48 import java.util.Collection;
50 import java.util.stream.Collectors;
52 import static org.slf4j.LoggerFactory.getLogger;
56 * Provides implementation of the meter service APIs.
58 @Component(immediate = true, enabled = true)
60 public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, MeterListener,
61 MeterProvider, MeterProviderService>
62 implements MeterService, MeterProviderRegistry {
64 private final String meterIdentifier = "meter-id-counter";
65 private final Logger log = getLogger(getClass());
66 private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
68 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69 protected StorageService storageService;
71 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72 protected MeterStore store;
74 private AtomicCounter meterIdCounter;
76 private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
79 public void activate() {
80 meterIdCounter = storageService.atomicCounterBuilder()
81 .withName(meterIdentifier)
84 store.setDelegate(delegate);
86 onComplete = (request, result, error) ->
88 request.context().ifPresent(c -> {
90 c.onError(request, MeterFailReason.UNKNOWN);
92 if (result.reason().isPresent()) {
93 c.onError(request, result.reason().get());
105 public void deactivate() {
106 store.unsetDelegate(delegate);
111 protected MeterProviderService createProviderService(MeterProvider provider) {
112 return new InternalMeterProviderService(provider);
116 public Meter submit(MeterRequest request) {
118 Meter.Builder mBuilder = DefaultMeter.builder()
119 .forDevice(request.deviceId())
120 .fromApp(request.appId())
121 .withBands(request.bands())
122 .withId(allocateMeterId())
123 .withUnit(request.unit());
125 if (request.isBurst()) {
128 DefaultMeter m = (DefaultMeter) mBuilder.build();
129 m.setState(MeterState.PENDING_ADD);
130 store.storeMeter(m).whenComplete((result, error) ->
131 onComplete.accept(request, result, error));
136 public void withdraw(MeterRequest request, MeterId meterId) {
137 Meter.Builder mBuilder = DefaultMeter.builder()
138 .forDevice(request.deviceId())
139 .fromApp(request.appId())
140 .withBands(request.bands())
142 .withUnit(request.unit());
144 if (request.isBurst()) {
148 DefaultMeter m = (DefaultMeter) mBuilder.build();
149 m.setState(MeterState.PENDING_REMOVE);
150 store.deleteMeter(m).whenComplete((result, error) ->
151 onComplete.accept(request, result, error));
155 public Meter getMeter(MeterId id) {
156 return store.getMeter(id);
160 public Collection<Meter> getAllMeters() {
161 return store.getAllMeters();
164 private MeterId allocateMeterId() {
165 // FIXME: This will break one day.
166 return MeterId.meterId((int) meterIdCounter.incrementAndGet());
169 private class InternalMeterProviderService
170 extends AbstractProviderService<MeterProvider>
171 implements MeterProviderService {
174 * Creates a provider service on behalf of the specified provider.
176 * @param provider provider to which this service is being issued
178 protected InternalMeterProviderService(MeterProvider provider) {
183 public void meterOperationFailed(MeterOperation operation,
184 MeterFailReason reason) {
185 store.failedMeter(operation, reason);
189 public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
190 //FIXME: FOLLOWING CODE CANNOT BE TESTED UNTIL SOMETHING THAT
191 //FIXME: IMPLEMENTS METERS EXISTS
192 Map<MeterId, Meter> storedMeterMap = store.getAllMeters().stream()
193 .collect(Collectors.toMap(Meter::id, m -> m));
195 meterEntries.stream()
196 .filter(m -> storedMeterMap.remove(m.id()) != null)
197 .forEach(m -> store.updateMeterState(m));
199 storedMeterMap.values().stream().forEach(m -> {
200 if (m.state() == MeterState.PENDING_ADD) {
201 provider().performMeterOperation(m.deviceId(),
202 new MeterOperation(m,
203 MeterOperation.Type.ADD));
205 store.deleteMeterNow(m);
211 private class InternalMeterStoreDelegate implements MeterStoreDelegate {
214 public void notify(MeterEvent event) {
215 DeviceId deviceId = event.subject().deviceId();
216 MeterProvider p = getProvider(event.subject().deviceId());
217 switch (event.type()) {
219 p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
220 MeterOperation.Type.ADD));
223 p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
224 MeterOperation.Type.REMOVE));
227 log.warn("Unknown meter event {}", event.type());