2 * Copyright 2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.ecmap;
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.onlab.util.AbstractAccumulator;
26 import org.onlab.util.KryoNamespace;
27 import org.onlab.util.SlidingWindowCounter;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.ControllerNode;
30 import org.onosproject.cluster.NodeId;
31 import org.onosproject.store.Timestamp;
32 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33 import org.onosproject.store.cluster.messaging.MessageSubject;
34 import org.onosproject.store.impl.LogicalTimestamp;
35 import org.onosproject.store.serializers.KryoNamespaces;
36 import org.onosproject.store.serializers.KryoSerializer;
37 import org.onosproject.store.service.EventuallyConsistentMap;
38 import org.onosproject.store.service.EventuallyConsistentMapEvent;
39 import org.onosproject.store.service.EventuallyConsistentMapListener;
40 import org.onosproject.store.service.WallClockTimestamp;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.List;
48 import java.util.Objects;
49 import java.util.Optional;
51 import java.util.Timer;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors;
54 import java.util.concurrent.ScheduledExecutorService;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.atomic.AtomicBoolean;
57 import java.util.concurrent.atomic.AtomicReference;
58 import java.util.function.BiFunction;
59 import java.util.stream.Collectors;
61 import static com.google.common.base.Preconditions.checkNotNull;
62 import static com.google.common.base.Preconditions.checkState;
63 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
64 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
65 import static org.onlab.util.Tools.groupedThreads;
66 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
67 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
70 * Distributed Map implementation which uses optimistic replication and gossip
71 * based techniques to provide an eventually consistent data store.
73 public class EventuallyConsistentMapImpl<K, V>
74 implements EventuallyConsistentMap<K, V> {
76 private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
78 private final Map<K, MapValue<V>> items;
80 private final ClusterService clusterService;
81 private final ClusterCommunicationService clusterCommunicator;
82 private final KryoSerializer serializer;
83 private final NodeId localNodeId;
85 private final BiFunction<K, V, Timestamp> timestampProvider;
87 private final MessageSubject updateMessageSubject;
88 private final MessageSubject antiEntropyAdvertisementSubject;
90 private final Set<EventuallyConsistentMapListener<K, V>> listeners
91 = Sets.newCopyOnWriteArraySet();
93 private final ExecutorService executor;
94 private final ScheduledExecutorService backgroundExecutor;
95 private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
97 private final ExecutorService communicationExecutor;
98 private final Map<NodeId, EventAccumulator> senderPending;
100 private final String mapName;
102 private volatile boolean destroyed = false;
103 private static final String ERROR_DESTROYED = " map is already destroyed";
104 private final String destroyedMessage;
106 private static final String ERROR_NULL_KEY = "Key cannot be null";
107 private static final String ERROR_NULL_VALUE = "Null values are not allowed";
109 private final long initialDelaySec = 5;
110 private final boolean lightweightAntiEntropy;
111 private final boolean tombstonesDisabled;
113 private static final int WINDOW_SIZE = 5;
114 private static final int HIGH_LOAD_THRESHOLD = 0;
115 private static final int LOAD_WINDOW = 2;
116 private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
118 private final boolean persistent;
119 private final PersistentStore<K, V> persistentStore;
122 * Creates a new eventually consistent map shared amongst multiple instances.
124 * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
125 * for more description of the parameters expected by the map.
128 * @param mapName a String identifier for the map.
129 * @param clusterService the cluster service
130 * @param clusterCommunicator the cluster communications service
131 * @param serializerBuilder a Kryo namespace builder that can serialize
133 * @param timestampProvider provider of timestamps for K and V
134 * @param peerUpdateFunction function that provides a set of nodes to immediately
135 * update to when there writes to the map
136 * @param eventExecutor executor to use for processing incoming
138 * @param communicationExecutor executor to use for sending events to peers
139 * @param backgroundExecutor executor to use for background anti-entropy
141 * @param tombstonesDisabled true if this map should not maintain
143 * @param antiEntropyPeriod period that the anti-entropy task should run
144 * @param antiEntropyTimeUnit time unit for anti-entropy period
145 * @param convergeFaster make anti-entropy try to converge faster
146 * @param persistent persist data to disk
148 EventuallyConsistentMapImpl(String mapName,
149 ClusterService clusterService,
150 ClusterCommunicationService clusterCommunicator,
151 KryoNamespace.Builder serializerBuilder,
152 BiFunction<K, V, Timestamp> timestampProvider,
153 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
154 ExecutorService eventExecutor,
155 ExecutorService communicationExecutor,
156 ScheduledExecutorService backgroundExecutor,
157 boolean tombstonesDisabled,
158 long antiEntropyPeriod,
159 TimeUnit antiEntropyTimeUnit,
160 boolean convergeFaster,
161 boolean persistent) {
162 this.mapName = mapName;
163 items = Maps.newConcurrentMap();
164 senderPending = Maps.newConcurrentMap();
165 destroyedMessage = mapName + ERROR_DESTROYED;
167 this.clusterService = clusterService;
168 this.clusterCommunicator = clusterCommunicator;
169 this.localNodeId = clusterService.getLocalNode().id();
171 this.serializer = createSerializer(serializerBuilder);
173 this.timestampProvider = timestampProvider;
175 if (peerUpdateFunction != null) {
176 this.peerUpdateFunction = peerUpdateFunction;
178 this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
179 .map(ControllerNode::id)
180 .filter(nodeId -> !nodeId.equals(localNodeId))
181 .collect(Collectors.toList());
184 if (eventExecutor != null) {
185 this.executor = eventExecutor;
187 // should be a normal executor; it's used for receiving messages
189 Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
192 if (communicationExecutor != null) {
193 this.communicationExecutor = communicationExecutor;
195 // sending executor; should be capped
196 //TODO this probably doesn't need to be bounded anymore
197 this.communicationExecutor =
198 newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
201 this.persistent = persistent;
203 if (this.persistent) {
204 String dataDirectory = System.getProperty("karaf.data", "./data");
205 String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
207 ExecutorService dbExecutor =
208 newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
210 persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
211 persistentStore.readInto(items);
213 this.persistentStore = null;
216 if (backgroundExecutor != null) {
217 this.backgroundExecutor = backgroundExecutor;
219 this.backgroundExecutor =
220 newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
223 // start anti-entropy thread
224 this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
225 initialDelaySec, antiEntropyPeriod,
226 antiEntropyTimeUnit);
228 updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
229 clusterCommunicator.addSubscriber(updateMessageSubject,
231 this::processUpdates,
234 antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
235 clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
237 this::handleAntiEntropyAdvertisement,
238 this.backgroundExecutor);
240 this.tombstonesDisabled = tombstonesDisabled;
241 this.lightweightAntiEntropy = !convergeFaster;
244 private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
245 return new KryoSerializer() {
247 protected void setupKryoPool() {
248 // Add the map's internal helper classes to the user-supplied serializer
249 serializerPool = builder
250 .register(KryoNamespaces.BASIC)
251 .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
252 .register(LogicalTimestamp.class)
253 .register(WallClockTimestamp.class)
254 .register(AntiEntropyAdvertisement.class)
255 .register(UpdateEntry.class)
256 .register(MapValue.class)
257 .register(MapValue.Digest.class)
265 checkState(!destroyed, destroyedMessage);
266 // TODO: Maintain a separate counter for tracking live elements in map.
267 return Maps.filterValues(items, MapValue::isAlive).size();
271 public boolean isEmpty() {
272 checkState(!destroyed, destroyedMessage);
277 public boolean containsKey(K key) {
278 checkState(!destroyed, destroyedMessage);
279 checkNotNull(key, ERROR_NULL_KEY);
280 return get(key) != null;
284 public boolean containsValue(V value) {
285 checkState(!destroyed, destroyedMessage);
286 checkNotNull(value, ERROR_NULL_VALUE);
287 return items.values()
289 .filter(MapValue::isAlive)
290 .anyMatch(v -> value.equals(v.get()));
294 public V get(K key) {
295 checkState(!destroyed, destroyedMessage);
296 checkNotNull(key, ERROR_NULL_KEY);
298 MapValue<V> value = items.get(key);
299 return (value == null || value.isTombstone()) ? null : value.get();
303 public void put(K key, V value) {
304 checkState(!destroyed, destroyedMessage);
305 checkNotNull(key, ERROR_NULL_KEY);
306 checkNotNull(value, ERROR_NULL_VALUE);
308 MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
309 if (putInternal(key, newValue)) {
310 notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
311 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
316 public V remove(K key) {
317 checkState(!destroyed, destroyedMessage);
318 checkNotNull(key, ERROR_NULL_KEY);
319 return removeAndNotify(key, null);
323 public void remove(K key, V value) {
324 checkState(!destroyed, destroyedMessage);
325 checkNotNull(key, ERROR_NULL_KEY);
326 checkNotNull(value, ERROR_NULL_VALUE);
327 removeAndNotify(key, value);
330 private V removeAndNotify(K key, V value) {
331 Timestamp timestamp = timestampProvider.apply(key, value);
332 Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
333 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
334 MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
335 if (previousValue != null) {
336 notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
337 peerUpdateFunction.apply(key, previousValue.get()));
338 if (previousValue.isAlive()) {
339 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
342 return previousValue != null ? previousValue.get() : null;
345 private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
346 checkState(!destroyed, destroyedMessage);
347 checkNotNull(key, ERROR_NULL_KEY);
348 checkNotNull(value, ERROR_NULL_VALUE);
349 tombstone.ifPresent(v -> checkState(v.isTombstone()));
351 counter.incrementCount();
352 AtomicBoolean updated = new AtomicBoolean(false);
353 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
354 items.compute(key, (k, existing) -> {
355 boolean valueMatches = true;
356 if (value.isPresent() && existing != null && existing.isAlive()) {
357 valueMatches = Objects.equals(value.get(), existing.get());
359 if (existing == null) {
360 log.trace("ECMap Remove: Existing value for key {} is already null", k);
363 if (existing == null) {
364 updated.set(tombstone.isPresent());
366 updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
370 previousValue.set(existing);
371 return tombstone.orElse(null);
378 if (tombstone.isPresent()) {
379 persistentStore.update(key, tombstone.get());
381 persistentStore.remove(key);
385 return previousValue.get();
389 public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
390 checkState(!destroyed, destroyedMessage);
391 checkNotNull(key, ERROR_NULL_KEY);
392 checkNotNull(recomputeFunction, "Recompute function cannot be null");
394 AtomicBoolean updated = new AtomicBoolean(false);
395 AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
396 MapValue<V> computedValue = items.compute(key, (k, mv) -> {
397 previousValue.set(mv);
398 V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
399 MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
400 if (mv == null || newValue.isNewerThan(mv)) {
408 notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
409 EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
410 V value = computedValue.isTombstone()
411 ? previousValue.get() == null ? null : previousValue.get().get()
412 : computedValue.get();
414 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
417 return computedValue.get();
421 public void putAll(Map<? extends K, ? extends V> m) {
422 checkState(!destroyed, destroyedMessage);
423 m.forEach(this::put);
427 public void clear() {
428 checkState(!destroyed, destroyedMessage);
429 Maps.filterValues(items, MapValue::isAlive)
430 .forEach((k, v) -> remove(k));
434 public Set<K> keySet() {
435 checkState(!destroyed, destroyedMessage);
436 return Maps.filterValues(items, MapValue::isAlive)
441 public Collection<V> values() {
442 checkState(!destroyed, destroyedMessage);
443 return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
447 public Set<Map.Entry<K, V>> entrySet() {
448 checkState(!destroyed, destroyedMessage);
449 return Maps.filterValues(items, MapValue::isAlive)
452 .map(e -> Pair.of(e.getKey(), e.getValue().get()))
453 .collect(Collectors.toSet());
457 * Returns true if newValue was accepted i.e. map is updated.
459 * @param newValue proposed new value
460 * @return true if update happened; false if map already contains a more recent value for the key
462 private boolean putInternal(K key, MapValue<V> newValue) {
463 checkState(!destroyed, destroyedMessage);
464 checkNotNull(key, ERROR_NULL_KEY);
465 checkNotNull(newValue, ERROR_NULL_VALUE);
466 checkState(newValue.isAlive());
467 counter.incrementCount();
468 AtomicBoolean updated = new AtomicBoolean(false);
469 items.compute(key, (k, existing) -> {
470 if (existing == null || newValue.isNewerThan(existing)) {
476 if (updated.get() && persistent) {
477 persistentStore.update(key, newValue);
479 return updated.get();
483 public void addListener(EventuallyConsistentMapListener<K, V> listener) {
484 checkState(!destroyed, destroyedMessage);
486 listeners.add(checkNotNull(listener));
490 public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
491 checkState(!destroyed, destroyedMessage);
493 listeners.remove(checkNotNull(listener));
497 public void destroy() {
501 backgroundExecutor.shutdown();
502 communicationExecutor.shutdown();
506 clusterCommunicator.removeSubscriber(updateMessageSubject);
507 clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
510 private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
511 listeners.forEach(listener -> listener.event(event));
514 private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
515 queueUpdate(event, peers);
518 private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
520 // we have no friends :(
523 peers.forEach(node ->
524 senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
528 private boolean underHighLoad() {
529 return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
532 private void sendAdvertisement() {
534 if (underHighLoad() || destroyed) {
537 pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
538 } catch (Exception e) {
539 // Catch all exceptions to avoid scheduled task being suppressed.
540 log.error("Exception thrown while sending advertisement", e);
544 private Optional<NodeId> pickRandomActivePeer() {
545 List<NodeId> activePeers = clusterService.getNodes()
547 .map(ControllerNode::id)
548 .filter(id -> !localNodeId.equals(id))
549 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
550 .collect(Collectors.toList());
551 Collections.shuffle(activePeers);
552 return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
555 private void sendAdvertisementToPeer(NodeId peer) {
556 clusterCommunicator.unicast(createAdvertisement(),
557 antiEntropyAdvertisementSubject,
560 .whenComplete((result, error) -> {
562 log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
567 private AntiEntropyAdvertisement<K> createAdvertisement() {
568 return new AntiEntropyAdvertisement<K>(localNodeId,
569 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
572 private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
573 if (destroyed || underHighLoad()) {
577 if (log.isTraceEnabled()) {
578 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
579 mapName, ad.sender(), ad.digest().size());
581 antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
583 if (!lightweightAntiEntropy) {
584 // if remote ad has any entries that the local copy is missing, actively sync
585 // TODO: Missing keys is not the way local copy can be behind.
586 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
587 // TODO: Send ad for missing keys and for entries that are stale
588 sendAdvertisementToPeer(ad.sender());
591 } catch (Exception e) {
592 log.warn("Error handling anti-entropy advertisement", e);
597 * Processes anti-entropy ad from peer by taking following actions:
598 * 1. If peer has an old entry, updates peer.
599 * 2. If peer indicates an entry is removed and has a more recent
600 * timestamp than the local entry, update local state.
602 private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
603 AntiEntropyAdvertisement<K> ad) {
604 final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
605 final NodeId sender = ad.sender();
606 items.forEach((key, localValue) -> {
607 MapValue.Digest remoteValueDigest = ad.digest().get(key);
608 if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
609 // local value is more recent, push to sender
610 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
612 if (remoteValueDigest != null
613 && remoteValueDigest.isNewerThan(localValue.digest())
614 && remoteValueDigest.isTombstone()) {
615 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
616 MapValue<V> previousValue = removeInternal(key,
618 Optional.of(tombstone));
619 if (previousValue != null && previousValue.isAlive()) {
620 externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
624 return externalEvents;
627 private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
631 updates.forEach(update -> {
632 final K key = update.key();
633 final MapValue<V> value = update.value();
634 if (value == null || value.isTombstone()) {
635 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
636 if (previousValue != null && previousValue.isAlive()) {
637 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
639 } else if (putInternal(key, value)) {
640 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
645 // TODO pull this into the class if this gets pulled out...
646 private static final int DEFAULT_MAX_EVENTS = 1000;
647 private static final int DEFAULT_MAX_IDLE_MS = 10;
648 private static final int DEFAULT_MAX_BATCH_MS = 50;
649 private static final Timer TIMER = new Timer("onos-ecm-sender-events");
651 private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
653 private final NodeId peer;
655 private EventAccumulator(NodeId peer) {
656 super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
661 public void processItems(List<UpdateEntry<K, V>> items) {
662 Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
663 items.forEach(item -> map.compute(item.key(), (key, existing) ->
664 item.isNewerThan(existing) ? item : existing));
665 communicationExecutor.submit(() -> {
666 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
667 updateMessageSubject,
670 .whenComplete((result, error) -> {
672 log.debug("Failed to send to {}", peer, error);