c6d300c95cff0f79e563f2f369c920ba4be6038b
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package org.onosproject.store.consistent.impl;
18
19 import com.google.common.cache.CacheBuilder;
20 import com.google.common.cache.CacheLoader;
21 import com.google.common.cache.LoadingCache;
22 import com.google.common.collect.Maps;
23 import org.onlab.util.HexString;
24 import org.onlab.util.SharedExecutors;
25 import org.onlab.util.Tools;
26 import org.onosproject.core.ApplicationId;
27 import org.onosproject.store.service.AsyncConsistentMap;
28 import org.onosproject.store.service.ConsistentMapException;
29 import org.onosproject.store.service.MapEvent;
30 import org.onosproject.store.service.MapEventListener;
31 import org.onosproject.store.service.Serializer;
32 import org.onosproject.store.service.Versioned;
33 import org.slf4j.Logger;
34
35 import java.util.Collection;
36 import java.util.Map;
37 import java.util.Map.Entry;
38 import java.util.Objects;
39 import java.util.Set;
40 import java.util.concurrent.CompletableFuture;
41 import java.util.concurrent.CopyOnWriteArraySet;
42 import java.util.concurrent.atomic.AtomicReference;
43 import java.util.function.BiFunction;
44 import java.util.function.Function;
45 import java.util.function.Predicate;
46 import java.util.stream.Collectors;
47
48 import static com.google.common.base.Preconditions.checkNotNull;
49 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.MAP_UPDATE;
50 import static org.onosproject.store.consistent.impl.StateMachineUpdate.Target.TX_COMMIT;
51 import static org.slf4j.LoggerFactory.getLogger;
52
53 /**
54  * AsyncConsistentMap implementation that is backed by a Raft consensus
55  * based database.
56  *
57  * @param <K> type of key.
58  * @param <V> type of value.
59  */
60 public class DefaultAsyncConsistentMap<K, V>  implements AsyncConsistentMap<K, V> {
61
62     private final String name;
63     private final ApplicationId applicationId;
64     private final Database database;
65     private final Serializer serializer;
66     private final boolean readOnly;
67     private final boolean purgeOnUninstall;
68
69     private static final String PRIMITIVE_NAME = "consistentMap";
70     private static final String SIZE = "size";
71     private static final String IS_EMPTY = "isEmpty";
72     private static final String CONTAINS_KEY = "containsKey";
73     private static final String CONTAINS_VALUE = "containsValue";
74     private static final String GET = "get";
75     private static final String COMPUTE_IF = "computeIf";
76     private static final String PUT = "put";
77     private static final String PUT_AND_GET = "putAndGet";
78     private static final String PUT_IF_ABSENT = "putIfAbsent";
79     private static final String REMOVE = "remove";
80     private static final String CLEAR = "clear";
81     private static final String KEY_SET = "keySet";
82     private static final String VALUES = "values";
83     private static final String ENTRY_SET = "entrySet";
84     private static final String REPLACE = "replace";
85     private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
86
87     private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
88
89     private final Logger log = getLogger(getClass());
90     private final MeteringAgent monitor;
91
92     private static final String ERROR_NULL_KEY = "Key cannot be null";
93     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
94
95     private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
96             .softValues()
97             .build(new CacheLoader<K, String>() {
98
99                 @Override
100                 public String load(K key) {
101                     return HexString.toHexString(serializer.encode(key));
102                 }
103             });
104
105     protected K dK(String key) {
106         return serializer.decode(HexString.fromHexString(key));
107     }
108
109     public DefaultAsyncConsistentMap(String name,
110                                      ApplicationId applicationId,
111                                      Database database,
112                                      Serializer serializer,
113                                      boolean readOnly,
114                                      boolean purgeOnUninstall,
115                                      boolean meteringEnabled) {
116         this.name = checkNotNull(name, "map name cannot be null");
117         this.applicationId = applicationId;
118         this.database = checkNotNull(database, "database cannot be null");
119         this.serializer = checkNotNull(serializer, "serializer cannot be null");
120         this.readOnly = readOnly;
121         this.purgeOnUninstall = purgeOnUninstall;
122         this.database.registerConsumer(update -> {
123             SharedExecutors.getSingleThreadExecutor().execute(() -> {
124                 if (listeners.isEmpty()) {
125                     return;
126                 }
127                 try {
128                     if (update.target() == MAP_UPDATE) {
129                         Result<UpdateResult<String, byte[]>> result = update.output();
130                         if (result.success() && result.value().mapName().equals(name)) {
131                             MapEvent<K, V> mapEvent = result.value()
132                                                             .<K, V>map(this::dK,
133                                                                        v -> serializer.decode(Tools.copyOf(v)))
134                                                             .toMapEvent();
135                             notifyListeners(mapEvent);
136                         }
137                     } else if (update.target() == TX_COMMIT) {
138                         CommitResponse response = update.output();
139                         if (response.success()) {
140                             response.updates().forEach(u -> {
141                                 if (u.mapName().equals(name)) {
142                                     MapEvent<K, V> mapEvent =
143                                             u.<K, V>map(this::dK,
144                                                         v -> serializer.decode(Tools.copyOf(v)))
145                                              .toMapEvent();
146                                     notifyListeners(mapEvent);
147                                 }
148                             });
149                         }
150                     }
151                 } catch (Exception e) {
152                     log.warn("Error notifying listeners", e);
153                 }
154             });
155         });
156         this.monitor = new MeteringAgent(PRIMITIVE_NAME, name, meteringEnabled);
157     }
158
159     /**
160      * Returns this map name.
161      * @return map name
162      */
163     public String name() {
164         return name;
165     }
166
167     /**
168      * Returns the serializer for map entries.
169      * @return map entry serializer
170      */
171     public Serializer serializer() {
172         return serializer;
173     }
174
175     /**
176      * Returns the applicationId owning this map.
177      * @return application Id
178      */
179     public ApplicationId applicationId() {
180         return applicationId;
181     }
182
183     /**
184      * Returns whether the map entries should be purged when the application
185      * owning it is uninstalled.
186      * @return true is map needs to cleared on app uninstall; false otherwise
187      */
188     public boolean purgeOnUninstall() {
189         return purgeOnUninstall;
190     }
191
192     @Override
193     public CompletableFuture<Integer> size() {
194         final MeteringAgent.Context timer = monitor.startTimer(SIZE);
195         return database.mapSize(name)
196                 .whenComplete((r, e) -> timer.stop(e));
197     }
198
199     @Override
200     public CompletableFuture<Boolean> isEmpty() {
201         final MeteringAgent.Context timer = monitor.startTimer(IS_EMPTY);
202         return database.mapIsEmpty(name)
203                 .whenComplete((r, e) -> timer.stop(e));
204     }
205
206     @Override
207     public CompletableFuture<Boolean> containsKey(K key) {
208         checkNotNull(key, ERROR_NULL_KEY);
209         final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_KEY);
210         return database.mapContainsKey(name, keyCache.getUnchecked(key))
211                 .whenComplete((r, e) -> timer.stop(e));
212     }
213
214     @Override
215     public CompletableFuture<Boolean> containsValue(V value) {
216         checkNotNull(value, ERROR_NULL_VALUE);
217         final MeteringAgent.Context timer = monitor.startTimer(CONTAINS_VALUE);
218         return database.mapContainsValue(name, serializer.encode(value))
219                 .whenComplete((r, e) -> timer.stop(e));
220     }
221
222     @Override
223     public CompletableFuture<Versioned<V>> get(K key) {
224         checkNotNull(key, ERROR_NULL_KEY);
225         final MeteringAgent.Context timer = monitor.startTimer(GET);
226         return database.mapGet(name, keyCache.getUnchecked(key))
227                 .whenComplete((r, e) -> timer.stop(e))
228                 .thenApply(v -> v != null ? v.map(serializer::decode) : null);
229     }
230
231     @Override
232     public CompletableFuture<Versioned<V>> computeIfAbsent(K key,
233                                                            Function<? super K, ? extends V> mappingFunction) {
234         checkNotNull(key, ERROR_NULL_KEY);
235         checkNotNull(mappingFunction, "Mapping function cannot be null");
236         final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF_ABSENT);
237         return updateAndGet(key, Match.ifNull(), Match.any(), mappingFunction.apply(key))
238                 .whenComplete((r, e) -> timer.stop(e))
239                 .thenApply(v -> v.newValue());
240     }
241
242     @Override
243     public CompletableFuture<Versioned<V>> computeIfPresent(K key,
244                             BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
245         return computeIf(key, Objects::nonNull, remappingFunction);
246     }
247
248     @Override
249     public CompletableFuture<Versioned<V>> compute(K key,
250                                                    BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
251         return computeIf(key, v -> true, remappingFunction);
252     }
253
254     @Override
255     public CompletableFuture<Versioned<V>> computeIf(K key,
256                                                      Predicate<? super V> condition,
257                                                      BiFunction<? super K, ? super V, ? extends V> remappingFunction) {
258         checkNotNull(key, ERROR_NULL_KEY);
259         checkNotNull(condition, "predicate function cannot be null");
260         checkNotNull(remappingFunction, "Remapping function cannot be null");
261         final MeteringAgent.Context timer = monitor.startTimer(COMPUTE_IF);
262         return get(key).thenCompose(r1 -> {
263             V existingValue = r1 == null ? null : r1.value();
264             // if the condition evaluates to false, return existing value.
265             if (!condition.test(existingValue)) {
266                 return CompletableFuture.completedFuture(r1);
267             }
268
269             AtomicReference<V> computedValue = new AtomicReference<>();
270             // if remappingFunction throws an exception, return the exception.
271             try {
272                 computedValue.set(remappingFunction.apply(key, existingValue));
273             } catch (Exception e) {
274                 return Tools.exceptionalFuture(e);
275             }
276             if (computedValue.get() == null && r1 == null) {
277                 return CompletableFuture.completedFuture(null);
278             }
279             Match<V> valueMatcher = r1 == null ? Match.ifNull() : Match.any();
280             Match<Long> versionMatcher = r1 == null ? Match.any() : Match.ifValue(r1.version());
281             return updateAndGet(key, valueMatcher, versionMatcher, computedValue.get())
282                     .whenComplete((r, e) -> timer.stop(e))
283                     .thenApply(v -> {
284                         if (v.updated()) {
285                             return v.newValue();
286                         } else {
287                             throw new ConsistentMapException.ConcurrentModification();
288                         }
289                     });
290         });
291     }
292
293     @Override
294     public CompletableFuture<Versioned<V>> put(K key, V value) {
295         checkNotNull(key, ERROR_NULL_KEY);
296         checkNotNull(value, ERROR_NULL_VALUE);
297         final MeteringAgent.Context timer = monitor.startTimer(PUT);
298         return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.oldValue())
299                 .whenComplete((r, e) -> timer.stop(e));
300     }
301
302     @Override
303     public CompletableFuture<Versioned<V>> putAndGet(K key, V value) {
304         checkNotNull(key, ERROR_NULL_KEY);
305         checkNotNull(value, ERROR_NULL_VALUE);
306         final MeteringAgent.Context timer = monitor.startTimer(PUT_AND_GET);
307         return updateAndGet(key, Match.any(), Match.any(), value).thenApply(v -> v.newValue())
308                 .whenComplete((r, e) -> timer.stop(e));
309     }
310
311     @Override
312     public CompletableFuture<Versioned<V>> remove(K key) {
313         checkNotNull(key, ERROR_NULL_KEY);
314         final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
315         return updateAndGet(key, Match.any(), Match.any(), null).thenApply(v -> v.oldValue())
316                 .whenComplete((r, e) -> timer.stop(e));
317     }
318
319     @Override
320     public CompletableFuture<Void> clear() {
321         checkIfUnmodifiable();
322         final MeteringAgent.Context timer = monitor.startTimer(CLEAR);
323         return database.mapClear(name).thenApply(this::unwrapResult)
324                 .whenComplete((r, e) -> timer.stop(e));
325     }
326
327     @Override
328     public CompletableFuture<Set<K>> keySet() {
329         final MeteringAgent.Context timer = monitor.startTimer(KEY_SET);
330         return database.mapKeySet(name)
331                 .thenApply(s -> s
332                         .stream()
333                         .map(this::dK)
334                         .collect(Collectors.toSet()))
335                 .whenComplete((r, e) -> timer.stop(e));
336     }
337
338     @Override
339     public CompletableFuture<Collection<Versioned<V>>> values() {
340         final MeteringAgent.Context timer = monitor.startTimer(VALUES);
341         return database.mapValues(name)
342                 .whenComplete((r, e) -> timer.stop(e))
343                 .thenApply(c -> c
344                         .stream()
345                         .map(v -> v.<V>map(serializer::decode))
346                         .collect(Collectors.toList()));
347     }
348
349     @Override
350     public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
351         final MeteringAgent.Context timer = monitor.startTimer(ENTRY_SET);
352         return database.mapEntrySet(name)
353                 .whenComplete((r, e) -> timer.stop(e))
354                 .thenApply(s -> s
355                         .stream()
356                         .map(this::mapRawEntry)
357                         .collect(Collectors.toSet()));
358     }
359
360     @Override
361     public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
362         checkNotNull(key, ERROR_NULL_KEY);
363         checkNotNull(value, ERROR_NULL_VALUE);
364         final MeteringAgent.Context timer = monitor.startTimer(PUT_IF_ABSENT);
365         return updateAndGet(key, Match.ifNull(), Match.any(), value)
366                 .whenComplete((r, e) -> timer.stop(e))
367                 .thenApply(v -> v.oldValue());
368     }
369
370     @Override
371     public CompletableFuture<Boolean> remove(K key, V value) {
372         checkNotNull(key, ERROR_NULL_KEY);
373         checkNotNull(value, ERROR_NULL_VALUE);
374         final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
375         return updateAndGet(key, Match.ifValue(value), Match.any(), null)
376                 .whenComplete((r, e) -> timer.stop(e))
377                 .thenApply(v -> v.updated());
378     }
379
380     @Override
381     public CompletableFuture<Boolean> remove(K key, long version) {
382         checkNotNull(key, ERROR_NULL_KEY);
383         final MeteringAgent.Context timer = monitor.startTimer(REMOVE);
384         return updateAndGet(key, Match.any(), Match.ifValue(version), null)
385                 .whenComplete((r, e) -> timer.stop(e))
386                 .thenApply(v -> v.updated());
387     }
388
389     @Override
390     public CompletableFuture<Boolean> replace(K key, V oldValue, V newValue) {
391         checkNotNull(key, ERROR_NULL_KEY);
392         checkNotNull(oldValue, ERROR_NULL_VALUE);
393         checkNotNull(newValue, ERROR_NULL_VALUE);
394         final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
395         return updateAndGet(key, Match.ifValue(oldValue), Match.any(), newValue)
396                 .whenComplete((r, e) -> timer.stop(e))
397                 .thenApply(v -> v.updated());
398     }
399
400     @Override
401     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
402         final MeteringAgent.Context timer = monitor.startTimer(REPLACE);
403         return updateAndGet(key, Match.any(), Match.ifValue(oldVersion), newValue)
404                 .whenComplete((r, e) -> timer.stop(e))
405                 .thenApply(v -> v.updated());
406     }
407
408     /**
409      * Pre-update hook for performing required checks/actions before going forward with an update operation.
410      * @param key map key.
411      */
412     protected void beforeUpdate(K key) {
413         checkIfUnmodifiable();
414     }
415
416     private Map.Entry<K, Versioned<V>> mapRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
417         return Maps.immutableEntry(dK(e.getKey()), e.getValue().<V>map(serializer::decode));
418     }
419
420     private CompletableFuture<UpdateResult<K, V>> updateAndGet(K key,
421                                                                Match<V> oldValueMatch,
422                                                                Match<Long> oldVersionMatch,
423                                                                V value) {
424         beforeUpdate(key);
425         return database.mapUpdate(name,
426                 keyCache.getUnchecked(key),
427                 oldValueMatch.map(serializer::encode),
428                 oldVersionMatch,
429                 value == null ? null : serializer.encode(value))
430                 .thenApply(this::unwrapResult)
431                 .thenApply(r -> r.<K, V>map(this::dK, serializer::decode));
432     }
433
434     private <T> T unwrapResult(Result<T> result) {
435         if (result.status() == Result.Status.LOCKED) {
436             throw new ConsistentMapException.ConcurrentModification();
437         } else if (result.success()) {
438             return result.value();
439         } else {
440             throw new IllegalStateException("Must not be here");
441         }
442     }
443
444     private void checkIfUnmodifiable() {
445         if (readOnly) {
446             throw new UnsupportedOperationException();
447         }
448     }
449
450     @Override
451     public void addListener(MapEventListener<K, V> listener) {
452         listeners.add(listener);
453     }
454
455     @Override
456     public void removeListener(MapEventListener<K, V> listener) {
457         listeners.remove(listener);
458     }
459
460     protected void notifyListeners(MapEvent<K, V> event) {
461         if (event == null) {
462             return;
463         }
464         listeners.forEach(listener -> {
465             try {
466                 listener.event(event);
467             } catch (Exception e) {
468                 log.warn("Failure notifying listener about {}", event, e);
469             }
470         });
471     }
472
473 }