575a7153879e637684a1bf9a1b0ef8e2ae0406d3
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.incubator.net.meter.impl;
17
18 import org.apache.felix.scr.annotations.Activate;
19 import org.apache.felix.scr.annotations.Component;
20 import org.apache.felix.scr.annotations.Deactivate;
21 import org.apache.felix.scr.annotations.Reference;
22 import org.apache.felix.scr.annotations.ReferenceCardinality;
23 import org.apache.felix.scr.annotations.Service;
24 import org.onlab.util.TriConsumer;
25 import org.onosproject.net.meter.DefaultMeter;
26 import org.onosproject.net.meter.Meter;
27 import org.onosproject.net.meter.MeterEvent;
28 import org.onosproject.net.meter.MeterFailReason;
29 import org.onosproject.net.meter.MeterId;
30 import org.onosproject.net.meter.MeterListener;
31 import org.onosproject.net.meter.MeterOperation;
32 import org.onosproject.net.meter.MeterProvider;
33 import org.onosproject.net.meter.MeterProviderRegistry;
34 import org.onosproject.net.meter.MeterProviderService;
35 import org.onosproject.net.meter.MeterRequest;
36 import org.onosproject.net.meter.MeterService;
37 import org.onosproject.net.meter.MeterState;
38 import org.onosproject.net.meter.MeterStore;
39 import org.onosproject.net.meter.MeterStoreDelegate;
40 import org.onosproject.net.meter.MeterStoreResult;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
43 import org.onosproject.net.provider.AbstractProviderService;
44 import org.onosproject.store.service.AtomicCounter;
45 import org.onosproject.store.service.StorageService;
46 import org.slf4j.Logger;
47
48 import java.util.Collection;
49 import java.util.Map;
50 import java.util.stream.Collectors;
51
52 import static org.slf4j.LoggerFactory.getLogger;
53
54
55 /**
56  * Provides implementation of the meter service APIs.
57  */
58 @Component(immediate = true, enabled = true)
59 @Service
60 public class MeterManager extends AbstractListenerProviderRegistry<MeterEvent, MeterListener,
61         MeterProvider, MeterProviderService>
62         implements MeterService, MeterProviderRegistry {
63
64     private final String meterIdentifier = "meter-id-counter";
65     private final Logger log = getLogger(getClass());
66     private final MeterStoreDelegate delegate = new InternalMeterStoreDelegate();
67
68     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
69     protected StorageService storageService;
70
71     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
72     protected MeterStore store;
73
74     private AtomicCounter meterIdCounter;
75
76     private TriConsumer<MeterRequest, MeterStoreResult, Throwable> onComplete;
77
78     @Activate
79     public void activate() {
80         meterIdCounter = storageService.atomicCounterBuilder()
81                 .withName(meterIdentifier)
82                 .build();
83
84         store.setDelegate(delegate);
85
86         onComplete = (request, result, error) ->
87             {
88                 request.context().ifPresent(c -> {
89                     if (error != null) {
90                         c.onError(request, MeterFailReason.UNKNOWN);
91                     } else {
92                         if (result.reason().isPresent()) {
93                             c.onError(request, result.reason().get());
94                         } else {
95                             c.onSuccess(request);
96                         }
97                     }
98                 });
99
100             };
101         log.info("Started");
102     }
103
104     @Deactivate
105     public void deactivate() {
106         store.unsetDelegate(delegate);
107         log.info("Stopped");
108     }
109
110     @Override
111     protected MeterProviderService createProviderService(MeterProvider provider) {
112         return new InternalMeterProviderService(provider);
113     }
114
115     @Override
116     public Meter submit(MeterRequest request) {
117
118         Meter.Builder mBuilder = DefaultMeter.builder()
119                 .forDevice(request.deviceId())
120                 .fromApp(request.appId())
121                 .withBands(request.bands())
122                 .withId(allocateMeterId())
123                 .withUnit(request.unit());
124
125         if (request.isBurst()) {
126             mBuilder.burst();
127         }
128         DefaultMeter m = (DefaultMeter) mBuilder.build();
129         m.setState(MeterState.PENDING_ADD);
130         store.storeMeter(m).whenComplete((result, error) ->
131                                                  onComplete.accept(request, result, error));
132         return m;
133     }
134
135     @Override
136     public void withdraw(MeterRequest request, MeterId meterId) {
137         Meter.Builder mBuilder = DefaultMeter.builder()
138                 .forDevice(request.deviceId())
139                 .fromApp(request.appId())
140                 .withBands(request.bands())
141                 .withId(meterId)
142                 .withUnit(request.unit());
143
144         if (request.isBurst()) {
145             mBuilder.burst();
146         }
147
148         DefaultMeter m = (DefaultMeter) mBuilder.build();
149         m.setState(MeterState.PENDING_REMOVE);
150         store.deleteMeter(m).whenComplete((result, error) ->
151                                                   onComplete.accept(request, result, error));
152     }
153
154     @Override
155     public Meter getMeter(MeterId id) {
156         return store.getMeter(id);
157     }
158
159     @Override
160     public Collection<Meter> getAllMeters() {
161         return store.getAllMeters();
162     }
163
164     private MeterId allocateMeterId() {
165         // FIXME: This will break one day.
166         return MeterId.meterId((int) meterIdCounter.incrementAndGet());
167     }
168
169     private class InternalMeterProviderService
170             extends AbstractProviderService<MeterProvider>
171             implements MeterProviderService {
172
173         /**
174          * Creates a provider service on behalf of the specified provider.
175          *
176          * @param provider provider to which this service is being issued
177          */
178         protected InternalMeterProviderService(MeterProvider provider) {
179             super(provider);
180         }
181
182         @Override
183         public void meterOperationFailed(MeterOperation operation,
184                                          MeterFailReason reason) {
185             store.failedMeter(operation, reason);
186         }
187
188         @Override
189         public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
190             //FIXME: FOLLOWING CODE CANNOT BE TESTED UNTIL SOMETHING THAT
191             //FIXME: IMPLEMENTS METERS EXISTS
192             Map<MeterId, Meter> storedMeterMap = store.getAllMeters().stream()
193                     .collect(Collectors.toMap(Meter::id, m -> m));
194
195             meterEntries.stream()
196                     .filter(m -> storedMeterMap.remove(m.id()) != null)
197                     .forEach(m -> store.updateMeterState(m));
198
199             storedMeterMap.values().stream().forEach(m -> {
200                 if (m.state() == MeterState.PENDING_ADD) {
201                     provider().performMeterOperation(m.deviceId(),
202                                                      new MeterOperation(m,
203                                                                         MeterOperation.Type.ADD));
204                 } else {
205                     store.deleteMeterNow(m);
206                 }
207             });
208         }
209     }
210
211     private class InternalMeterStoreDelegate implements MeterStoreDelegate {
212
213         @Override
214         public void notify(MeterEvent event) {
215             DeviceId deviceId = event.subject().deviceId();
216             MeterProvider p = getProvider(event.subject().deviceId());
217             switch (event.type()) {
218                 case METER_ADD_REQ:
219                     p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
220                                                                          MeterOperation.Type.ADD));
221                     break;
222                 case METER_REM_REQ:
223                     p.performMeterOperation(deviceId, new MeterOperation(event.subject(),
224                                                                          MeterOperation.Type.REMOVE));
225                     break;
226                 default:
227                     log.warn("Unknown meter event {}", event.type());
228             }
229
230         }
231     }
232
233 }