2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.ecmap;
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.onlab.util.AbstractAccumulator;
27 import org.onlab.util.KryoNamespace;
28 import org.onlab.util.SlidingWindowCounter;
29 import org.onosproject.cluster.ClusterService;
30 import org.onosproject.cluster.ControllerNode;
31 import org.onosproject.cluster.NodeId;
32 import org.onosproject.store.Timestamp;
33 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34 import org.onosproject.store.cluster.messaging.MessageSubject;
35 import org.onosproject.store.impl.LogicalTimestamp;
36 import org.onosproject.store.service.WallClockTimestamp;
37 import org.onosproject.store.serializers.KryoNamespaces;
38 import org.onosproject.store.serializers.KryoSerializer;
39 import org.onosproject.store.service.EventuallyConsistentMap;
40 import org.onosproject.store.service.EventuallyConsistentMapEvent;
41 import org.onosproject.store.service.EventuallyConsistentMapListener;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
45 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
46 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.List;
52 import java.util.Objects;
53 import java.util.Optional;
55 import java.util.Timer;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicBoolean;
61 import java.util.concurrent.atomic.AtomicReference;
62 import java.util.function.BiFunction;
63 import java.util.stream.Collectors;
65 import static com.google.common.base.Preconditions.checkNotNull;
66 import static com.google.common.base.Preconditions.checkState;
67 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
68 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69 import static org.onlab.util.Tools.groupedThreads;
72 * Distributed Map implementation which uses optimistic replication and gossip
73 * based techniques to provide an eventually consistent data store.
75 public class EventuallyConsistentMapImpl<K, V>
76 implements EventuallyConsistentMap<K, V> {
78 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
80 private final Map<K, MapValue<V>> items;
82 private final ClusterService clusterService;
83 private final ClusterCommunicationService clusterCommunicator;
84 private final KryoSerializer serializer;
85 private final NodeId localNodeId;
87 private final BiFunction<K, V, Timestamp> timestampProvider;
89 private final MessageSubject updateMessageSubject;
90 private final MessageSubject antiEntropyAdvertisementSubject;
92 private final Set<EventuallyConsistentMapListener<K, V>> listeners
93 = Sets.newCopyOnWriteArraySet();
95 private final ExecutorService executor;
96 private final ScheduledExecutorService backgroundExecutor;
97 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
99 private final ExecutorService communicationExecutor;
100 private final Map<NodeId, EventAccumulator> senderPending;
102 private final String mapName;
104 private volatile boolean destroyed = false;
105 private static final String ERROR_DESTROYED = " map is already destroyed";
106 private final String destroyedMessage;
108 private static final String ERROR_NULL_KEY = "Key cannot be null";
109 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
111 private final long initialDelaySec = 5;
112 private final boolean lightweightAntiEntropy;
113 private final boolean tombstonesDisabled;
115 private static final int WINDOW_SIZE = 5;
116 private static final int HIGH_LOAD_THRESHOLD = 0;
117 private static final int LOAD_WINDOW = 2;
118 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
120 private final boolean persistent;
121 private final PersistentStore<K, V> persistentStore;
124 * Creates a new eventually consistent map shared amongst multiple instances.
126 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
127 * for more description of the parameters expected by the map.
130 * @param mapName a String identifier for the map.
131 * @param clusterService the cluster service
132 * @param clusterCommunicator the cluster communications service
133 * @param serializerBuilder a Kryo namespace builder that can serialize
135 * @param timestampProvider provider of timestamps for K and V
136 * @param peerUpdateFunction function that provides a set of nodes to immediately
137 * update to when there writes to the map
138 * @param eventExecutor executor to use for processing incoming
140 * @param communicationExecutor executor to use for sending events to peers
141 * @param backgroundExecutor executor to use for background anti-entropy
143 * @param tombstonesDisabled true if this map should not maintain
145 * @param antiEntropyPeriod period that the anti-entropy task should run
146 * @param antiEntropyTimeUnit time unit for anti-entropy period
147 * @param convergeFaster make anti-entropy try to converge faster
148 * @param persistent persist data to disk
150 EventuallyConsistentMapImpl(String mapName,
151 ClusterService clusterService,
152 ClusterCommunicationService clusterCommunicator,
153 KryoNamespace.Builder serializerBuilder,
154 BiFunction<K, V, Timestamp> timestampProvider,
155 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
156 ExecutorService eventExecutor,
157 ExecutorService communicationExecutor,
158 ScheduledExecutorService backgroundExecutor,
159 boolean tombstonesDisabled,
160 long antiEntropyPeriod,
161 TimeUnit antiEntropyTimeUnit,
162 boolean convergeFaster,
163 boolean persistent) {
164 this.mapName = mapName;
165 items = Maps.newConcurrentMap();
166 senderPending = Maps.newConcurrentMap();
167 destroyedMessage = mapName + ERROR_DESTROYED;
169 this.clusterService = clusterService;
170 this.clusterCommunicator = clusterCommunicator;
171 this.localNodeId = clusterService.getLocalNode().id();
173 this.serializer = createSerializer(serializerBuilder);
175 this.timestampProvider = timestampProvider;
177 if (peerUpdateFunction != null) {
178 this.peerUpdateFunction = peerUpdateFunction;
180 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
181 .map(ControllerNode::id)
182 .filter(nodeId -> !nodeId.equals(localNodeId))
183 .collect(Collectors.toList());
186 if (eventExecutor != null) {
187 this.executor = eventExecutor;
189 // should be a normal executor; it's used for receiving messages
191 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
194 if (communicationExecutor != null) {
195 this.communicationExecutor = communicationExecutor;
197 // sending executor; should be capped
198 //TODO this probably doesn't need to be bounded anymore
199 this.communicationExecutor =
200 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
203 this.persistent = persistent;
205 if (this.persistent) {
206 String dataDirectory = System.getProperty("karaf.data", "./data");
207 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
209 ExecutorService dbExecutor =
210 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
212 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
213 persistentStore.readInto(items);
215 this.persistentStore = null;
218 if (backgroundExecutor != null) {
219 this.backgroundExecutor = backgroundExecutor;
221 this.backgroundExecutor =
222 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
225 // start anti-entropy thread
226 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
227 initialDelaySec, antiEntropyPeriod,
228 antiEntropyTimeUnit);
230 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
231 clusterCommunicator.addSubscriber(updateMessageSubject,
233 this::processUpdates,
236 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
237 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
239 this::handleAntiEntropyAdvertisement,
240 this.backgroundExecutor);
242 this.tombstonesDisabled = tombstonesDisabled;
243 this.lightweightAntiEntropy = !convergeFaster;
246 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
247 return new KryoSerializer() {
249 protected void setupKryoPool() {
250 // Add the map's internal helper classes to the user-supplied serializer
251 serializerPool = builder
252 .register(KryoNamespaces.BASIC)
253 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
254 .register(LogicalTimestamp.class)
255 .register(WallClockTimestamp.class)
256 .register(AntiEntropyAdvertisement.class)
257 .register(UpdateEntry.class)
258 .register(MapValue.class)
259 .register(MapValue.Digest.class)
267 checkState(!destroyed, destroyedMessage);
268 // TODO: Maintain a separate counter for tracking live elements in map.
269 return Maps.filterValues(items, MapValue::isAlive).size();
273 public boolean isEmpty() {
274 checkState(!destroyed, destroyedMessage);
279 public boolean containsKey(K key) {
280 checkState(!destroyed, destroyedMessage);
281 checkNotNull(key, ERROR_NULL_KEY);
282 return get(key) != null;
286 public boolean containsValue(V value) {
287 checkState(!destroyed, destroyedMessage);
288 checkNotNull(value, ERROR_NULL_VALUE);
289 return items.values()
291 .filter(MapValue::isAlive)
292 .anyMatch(v -> value.equals(v.get()));
296 public V get(K key) {
297 checkState(!destroyed, destroyedMessage);
298 checkNotNull(key, ERROR_NULL_KEY);
300 MapValue<V> value = items.get(key);
301 return (value == null || value.isTombstone()) ? null : value.get();
305 public void put(K key, V value) {
306 checkState(!destroyed, destroyedMessage);
307 checkNotNull(key, ERROR_NULL_KEY);
308 checkNotNull(value, ERROR_NULL_VALUE);
310 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
311 if (putInternal(key, newValue)) {
312 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
313 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
318 public V remove(K key) {
319 checkState(!destroyed, destroyedMessage);
320 checkNotNull(key, ERROR_NULL_KEY);
321 return removeAndNotify(key, null);
325 public void remove(K key, V value) {
326 checkState(!destroyed, destroyedMessage);
327 checkNotNull(key, ERROR_NULL_KEY);
328 checkNotNull(value, ERROR_NULL_VALUE);
329 removeAndNotify(key, value);
332 private V removeAndNotify(K key, V value) {
333 Timestamp timestamp = timestampProvider.apply(key, value);
334 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
335 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
336 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
337 if (previousValue != null) {
338 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
339 peerUpdateFunction.apply(key, previousValue.get()));
340 if (previousValue.isAlive()) {
341 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
344 return previousValue != null ? previousValue.get() : null;
347 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
348 checkState(!destroyed, destroyedMessage);
349 checkNotNull(key, ERROR_NULL_KEY);
350 checkNotNull(value, ERROR_NULL_VALUE);
351 tombstone.ifPresent(v -> checkState(v.isTombstone()));
353 counter.incrementCount();
354 AtomicBoolean updated = new AtomicBoolean(false);
355 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
356 items.compute(key, (k, existing) -> {
357 boolean valueMatches = true;
358 if (value.isPresent() && existing != null && existing.isAlive()) {
359 valueMatches = Objects.equals(value.get(), existing.get());
361 if (existing == null) {
362 log.debug("ECMap Remove: Existing value for key {} is already null", k);
365 if (existing == null) {
366 updated.set(tombstone.isPresent());
368 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
372 previousValue.set(existing);
373 return tombstone.orElse(null);
380 if (tombstone.isPresent()) {
381 persistentStore.update(key, tombstone.get());
383 persistentStore.remove(key);
387 return previousValue.get();
391 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
392 checkState(!destroyed, destroyedMessage);
393 checkNotNull(key, ERROR_NULL_KEY);
394 checkNotNull(recomputeFunction, "Recompute function cannot be null");
396 AtomicBoolean updated = new AtomicBoolean(false);
397 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
398 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
399 previousValue.set(mv);
400 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
401 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
402 if (mv == null || newValue.isNewerThan(mv)) {
410 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
411 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
412 V value = computedValue.isTombstone()
413 ? previousValue.get() == null ? null : previousValue.get().get()
414 : computedValue.get();
416 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
419 return computedValue.get();
423 public void putAll(Map<? extends K, ? extends V> m) {
424 checkState(!destroyed, destroyedMessage);
425 m.forEach(this::put);
429 public void clear() {
430 checkState(!destroyed, destroyedMessage);
431 Maps.filterValues(items, MapValue::isAlive)
432 .forEach((k, v) -> remove(k));
436 public Set<K> keySet() {
437 checkState(!destroyed, destroyedMessage);
438 return Maps.filterValues(items, MapValue::isAlive)
443 public Collection<V> values() {
444 checkState(!destroyed, destroyedMessage);
445 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
449 public Set<Map.Entry<K, V>> entrySet() {
450 checkState(!destroyed, destroyedMessage);
451 return Maps.filterValues(items, MapValue::isAlive)
454 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
455 .collect(Collectors.toSet());
459 * Returns true if newValue was accepted i.e. map is updated.
461 * @param newValue proposed new value
462 * @return true if update happened; false if map already contains a more recent value for the key
464 private boolean putInternal(K key, MapValue<V> newValue) {
465 checkState(!destroyed, destroyedMessage);
466 checkNotNull(key, ERROR_NULL_KEY);
467 checkNotNull(newValue, ERROR_NULL_VALUE);
468 checkState(newValue.isAlive());
469 counter.incrementCount();
470 AtomicBoolean updated = new AtomicBoolean(false);
471 items.compute(key, (k, existing) -> {
472 if (existing == null || newValue.isNewerThan(existing)) {
478 if (updated.get() && persistent) {
479 persistentStore.update(key, newValue);
481 return updated.get();
485 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
486 checkState(!destroyed, destroyedMessage);
488 listeners.add(checkNotNull(listener));
492 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
493 checkState(!destroyed, destroyedMessage);
495 listeners.remove(checkNotNull(listener));
499 public void destroy() {
503 backgroundExecutor.shutdown();
504 communicationExecutor.shutdown();
508 clusterCommunicator.removeSubscriber(updateMessageSubject);
509 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
512 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
513 listeners.forEach(listener -> listener.event(event));
516 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
517 queueUpdate(event, peers);
520 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
522 // we have no friends :(
525 peers.forEach(node ->
526 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
530 private boolean underHighLoad() {
531 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
534 private void sendAdvertisement() {
536 if (underHighLoad() || destroyed) {
539 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
540 } catch (Exception e) {
541 // Catch all exceptions to avoid scheduled task being suppressed.
542 log.error("Exception thrown while sending advertisement", e);
546 private Optional<NodeId> pickRandomActivePeer() {
547 List<NodeId> activePeers = clusterService.getNodes()
549 .map(ControllerNode::id)
550 .filter(id -> !localNodeId.equals(id))
551 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
552 .collect(Collectors.toList());
553 Collections.shuffle(activePeers);
554 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
557 private void sendAdvertisementToPeer(NodeId peer) {
558 clusterCommunicator.unicast(createAdvertisement(),
559 antiEntropyAdvertisementSubject,
562 .whenComplete((result, error) -> {
564 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
569 private AntiEntropyAdvertisement<K> createAdvertisement() {
570 return new AntiEntropyAdvertisement<K>(localNodeId,
571 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
574 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
575 if (destroyed || underHighLoad()) {
579 log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
580 mapName, ad.sender(), ad.digest().size());
581 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
583 if (!lightweightAntiEntropy) {
584 // if remote ad has any entries that the local copy is missing, actively sync
585 // TODO: Missing keys is not the way local copy can be behind.
586 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
587 // TODO: Send ad for missing keys and for entries that are stale
588 sendAdvertisementToPeer(ad.sender());
591 } catch (Exception e) {
592 log.warn("Error handling anti-entropy advertisement", e);
597 * Processes anti-entropy ad from peer by taking following actions:
598 * 1. If peer has an old entry, updates peer.
599 * 2. If peer indicates an entry is removed and has a more recent
600 * timestamp than the local entry, update local state.
602 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
603 AntiEntropyAdvertisement<K> ad) {
604 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
605 final NodeId sender = ad.sender();
606 items.forEach((key, localValue) -> {
607 MapValue.Digest remoteValueDigest = ad.digest().get(key);
608 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
609 // local value is more recent, push to sender
610 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
612 if (remoteValueDigest != null
613 && remoteValueDigest.isNewerThan(localValue.digest())
614 && remoteValueDigest.isTombstone()) {
615 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
616 MapValue<V> previousValue = removeInternal(key,
618 Optional.of(tombstone));
619 if (previousValue != null && previousValue.isAlive()) {
620 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
624 return externalEvents;
627 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
631 updates.forEach(update -> {
632 final K key = update.key();
633 final MapValue<V> value = update.value();
634 if (value == null || value.isTombstone()) {
635 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
636 if (previousValue != null && previousValue.isAlive()) {
637 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
639 } else if (putInternal(key, value)) {
640 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
645 // TODO pull this into the class if this gets pulled out...
646 private static final int DEFAULT_MAX_EVENTS = 1000;
647 private static final int DEFAULT_MAX_IDLE_MS = 10;
648 private static final int DEFAULT_MAX_BATCH_MS = 50;
649 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
651 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
653 private final NodeId peer;
655 private EventAccumulator(NodeId peer) {
656 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
661 public void processItems(List<UpdateEntry<K, V>> items) {
662 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
663 items.forEach(item -> map.compute(item.key(), (key, existing) ->
664 item.isNewerThan(existing) ? item : existing));
665 communicationExecutor.submit(() -> {
666 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
667 updateMessageSubject,
670 .whenComplete((result, error) -> {
672 log.debug("Failed to send to {}", peer, error);