2859b62f1cf5c53d494c4d9548cdd22143f575a7
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.ecmap;
17
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24
25 import org.apache.commons.lang3.tuple.Pair;
26 import org.onlab.util.AbstractAccumulator;
27 import org.onlab.util.KryoNamespace;
28 import org.onlab.util.SlidingWindowCounter;
29 import org.onosproject.cluster.ClusterService;
30 import org.onosproject.cluster.ControllerNode;
31 import org.onosproject.cluster.NodeId;
32 import org.onosproject.store.Timestamp;
33 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
34 import org.onosproject.store.cluster.messaging.MessageSubject;
35 import org.onosproject.store.impl.LogicalTimestamp;
36 import org.onosproject.store.service.WallClockTimestamp;
37 import org.onosproject.store.serializers.KryoNamespaces;
38 import org.onosproject.store.serializers.KryoSerializer;
39 import org.onosproject.store.service.EventuallyConsistentMap;
40 import org.onosproject.store.service.EventuallyConsistentMapEvent;
41 import org.onosproject.store.service.EventuallyConsistentMapListener;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44
45 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
46 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
47
48 import java.util.Collection;
49 import java.util.Collections;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.Objects;
53 import java.util.Optional;
54 import java.util.Set;
55 import java.util.Timer;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.ScheduledExecutorService;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.atomic.AtomicBoolean;
61 import java.util.concurrent.atomic.AtomicReference;
62 import java.util.function.BiFunction;
63 import java.util.stream.Collectors;
64
65 import static com.google.common.base.Preconditions.checkNotNull;
66 import static com.google.common.base.Preconditions.checkState;
67 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
68 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
69 import static org.onlab.util.Tools.groupedThreads;
70
71 /**
72  * Distributed Map implementation which uses optimistic replication and gossip
73  * based techniques to provide an eventually consistent data store.
74  */
75 public class EventuallyConsistentMapImpl<K, V>
76         implements EventuallyConsistentMap<K, V> {
77
78     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
79
80     private final Map<K, MapValue<V>> items;
81
82     private final ClusterService clusterService;
83     private final ClusterCommunicationService clusterCommunicator;
84     private final KryoSerializer serializer;
85     private final NodeId localNodeId;
86
87     private final BiFunction<K, V, Timestamp> timestampProvider;
88
89     private final MessageSubject updateMessageSubject;
90     private final MessageSubject antiEntropyAdvertisementSubject;
91
92     private final Set<EventuallyConsistentMapListener<K, V>> listeners
93             = Sets.newCopyOnWriteArraySet();
94
95     private final ExecutorService executor;
96     private final ScheduledExecutorService backgroundExecutor;
97     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
98
99     private final ExecutorService communicationExecutor;
100     private final Map<NodeId, EventAccumulator> senderPending;
101
102     private final String mapName;
103
104     private volatile boolean destroyed = false;
105     private static final String ERROR_DESTROYED = " map is already destroyed";
106     private final String destroyedMessage;
107
108     private static final String ERROR_NULL_KEY = "Key cannot be null";
109     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
110
111     private final long initialDelaySec = 5;
112     private final boolean lightweightAntiEntropy;
113     private final boolean tombstonesDisabled;
114
115     private static final int WINDOW_SIZE = 5;
116     private static final int HIGH_LOAD_THRESHOLD = 0;
117     private static final int LOAD_WINDOW = 2;
118     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
119
120     private final boolean persistent;
121     private final PersistentStore<K, V> persistentStore;
122
123     /**
124      * Creates a new eventually consistent map shared amongst multiple instances.
125      * <p>
126      * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
127      * for more description of the parameters expected by the map.
128      * </p>
129      *
130      * @param mapName               a String identifier for the map.
131      * @param clusterService        the cluster service
132      * @param clusterCommunicator   the cluster communications service
133      * @param serializerBuilder     a Kryo namespace builder that can serialize
134      *                              both K and V
135      * @param timestampProvider     provider of timestamps for K and V
136      * @param peerUpdateFunction    function that provides a set of nodes to immediately
137      *                              update to when there writes to the map
138      * @param eventExecutor         executor to use for processing incoming
139      *                              events from peers
140      * @param communicationExecutor executor to use for sending events to peers
141      * @param backgroundExecutor    executor to use for background anti-entropy
142      *                              tasks
143      * @param tombstonesDisabled    true if this map should not maintain
144      *                              tombstones
145      * @param antiEntropyPeriod     period that the anti-entropy task should run
146      * @param antiEntropyTimeUnit   time unit for anti-entropy period
147      * @param convergeFaster        make anti-entropy try to converge faster
148      * @param persistent            persist data to disk
149      */
150     EventuallyConsistentMapImpl(String mapName,
151                                 ClusterService clusterService,
152                                 ClusterCommunicationService clusterCommunicator,
153                                 KryoNamespace.Builder serializerBuilder,
154                                 BiFunction<K, V, Timestamp> timestampProvider,
155                                 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
156                                 ExecutorService eventExecutor,
157                                 ExecutorService communicationExecutor,
158                                 ScheduledExecutorService backgroundExecutor,
159                                 boolean tombstonesDisabled,
160                                 long antiEntropyPeriod,
161                                 TimeUnit antiEntropyTimeUnit,
162                                 boolean convergeFaster,
163                                 boolean persistent) {
164         this.mapName = mapName;
165         items = Maps.newConcurrentMap();
166         senderPending = Maps.newConcurrentMap();
167         destroyedMessage = mapName + ERROR_DESTROYED;
168
169         this.clusterService = clusterService;
170         this.clusterCommunicator = clusterCommunicator;
171         this.localNodeId = clusterService.getLocalNode().id();
172
173         this.serializer = createSerializer(serializerBuilder);
174
175         this.timestampProvider = timestampProvider;
176
177         if (peerUpdateFunction != null) {
178             this.peerUpdateFunction = peerUpdateFunction;
179         } else {
180             this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
181                     .map(ControllerNode::id)
182                     .filter(nodeId -> !nodeId.equals(localNodeId))
183                     .collect(Collectors.toList());
184         }
185
186         if (eventExecutor != null) {
187             this.executor = eventExecutor;
188         } else {
189             // should be a normal executor; it's used for receiving messages
190             this.executor =
191                     Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
192         }
193
194         if (communicationExecutor != null) {
195             this.communicationExecutor = communicationExecutor;
196         } else {
197             // sending executor; should be capped
198             //TODO this probably doesn't need to be bounded anymore
199             this.communicationExecutor =
200                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
201         }
202
203         this.persistent = persistent;
204
205         if (this.persistent) {
206             String dataDirectory = System.getProperty("karaf.data", "./data");
207             String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
208
209             ExecutorService dbExecutor =
210                     newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
211
212             persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
213             persistentStore.readInto(items);
214         } else {
215             this.persistentStore = null;
216         }
217
218         if (backgroundExecutor != null) {
219             this.backgroundExecutor = backgroundExecutor;
220         } else {
221             this.backgroundExecutor =
222                     newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
223         }
224
225         // start anti-entropy thread
226         this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
227                                                     initialDelaySec, antiEntropyPeriod,
228                                                     antiEntropyTimeUnit);
229
230         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
231         clusterCommunicator.addSubscriber(updateMessageSubject,
232                                           serializer::decode,
233                                           this::processUpdates,
234                                           this.executor);
235
236         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
237         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
238                                           serializer::decode,
239                                           this::handleAntiEntropyAdvertisement,
240                                           this.backgroundExecutor);
241
242         this.tombstonesDisabled = tombstonesDisabled;
243         this.lightweightAntiEntropy = !convergeFaster;
244     }
245
246     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
247         return new KryoSerializer() {
248             @Override
249             protected void setupKryoPool() {
250                 // Add the map's internal helper classes to the user-supplied serializer
251                 serializerPool = builder
252                         .register(KryoNamespaces.BASIC)
253                         .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
254                         .register(LogicalTimestamp.class)
255                         .register(WallClockTimestamp.class)
256                         .register(AntiEntropyAdvertisement.class)
257                         .register(UpdateEntry.class)
258                         .register(MapValue.class)
259                         .register(MapValue.Digest.class)
260                         .build();
261             }
262         };
263     }
264
265     @Override
266     public int size() {
267         checkState(!destroyed, destroyedMessage);
268         // TODO: Maintain a separate counter for tracking live elements in map.
269         return Maps.filterValues(items, MapValue::isAlive).size();
270     }
271
272     @Override
273     public boolean isEmpty() {
274         checkState(!destroyed, destroyedMessage);
275         return size() == 0;
276     }
277
278     @Override
279     public boolean containsKey(K key) {
280         checkState(!destroyed, destroyedMessage);
281         checkNotNull(key, ERROR_NULL_KEY);
282         return get(key) != null;
283     }
284
285     @Override
286     public boolean containsValue(V value) {
287         checkState(!destroyed, destroyedMessage);
288         checkNotNull(value, ERROR_NULL_VALUE);
289         return items.values()
290                     .stream()
291                     .filter(MapValue::isAlive)
292                     .anyMatch(v -> value.equals(v.get()));
293     }
294
295     @Override
296     public V get(K key) {
297         checkState(!destroyed, destroyedMessage);
298         checkNotNull(key, ERROR_NULL_KEY);
299
300         MapValue<V> value = items.get(key);
301         return (value == null || value.isTombstone()) ? null : value.get();
302     }
303
304     @Override
305     public void put(K key, V value) {
306         checkState(!destroyed, destroyedMessage);
307         checkNotNull(key, ERROR_NULL_KEY);
308         checkNotNull(value, ERROR_NULL_VALUE);
309
310         MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
311         if (putInternal(key, newValue)) {
312             notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
313             notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
314         }
315     }
316
317     @Override
318     public V remove(K key) {
319         checkState(!destroyed, destroyedMessage);
320         checkNotNull(key, ERROR_NULL_KEY);
321         return removeAndNotify(key, null);
322     }
323
324     @Override
325     public void remove(K key, V value) {
326         checkState(!destroyed, destroyedMessage);
327         checkNotNull(key, ERROR_NULL_KEY);
328         checkNotNull(value, ERROR_NULL_VALUE);
329         removeAndNotify(key, value);
330     }
331
332     private V removeAndNotify(K key, V value) {
333         Timestamp timestamp = timestampProvider.apply(key, value);
334         Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
335                 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
336         MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
337         if (previousValue != null) {
338             notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
339                         peerUpdateFunction.apply(key, previousValue.get()));
340             if (previousValue.isAlive()) {
341                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
342             }
343         }
344         return previousValue != null ? previousValue.get() : null;
345     }
346
347     private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
348         checkState(!destroyed, destroyedMessage);
349         checkNotNull(key, ERROR_NULL_KEY);
350         checkNotNull(value, ERROR_NULL_VALUE);
351         tombstone.ifPresent(v -> checkState(v.isTombstone()));
352
353         counter.incrementCount();
354         AtomicBoolean updated = new AtomicBoolean(false);
355         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
356         items.compute(key, (k, existing) -> {
357             boolean valueMatches = true;
358             if (value.isPresent() && existing != null && existing.isAlive()) {
359                 valueMatches = Objects.equals(value.get(), existing.get());
360             }
361             if (existing == null) {
362                 log.debug("ECMap Remove: Existing value for key {} is already null", k);
363             }
364             if (valueMatches) {
365                 if (existing == null) {
366                     updated.set(tombstone.isPresent());
367                 } else {
368                     updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
369                 }
370             }
371             if (updated.get()) {
372                 previousValue.set(existing);
373                 return tombstone.orElse(null);
374             } else {
375                 return existing;
376             }
377         });
378         if (updated.get()) {
379             if (persistent) {
380                 if (tombstone.isPresent()) {
381                     persistentStore.update(key, tombstone.get());
382                 } else {
383                     persistentStore.remove(key);
384                 }
385             }
386         }
387         return previousValue.get();
388     }
389
390     @Override
391     public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
392         checkState(!destroyed, destroyedMessage);
393         checkNotNull(key, ERROR_NULL_KEY);
394         checkNotNull(recomputeFunction, "Recompute function cannot be null");
395
396         AtomicBoolean updated = new AtomicBoolean(false);
397         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
398         MapValue<V> computedValue = items.compute(key, (k, mv) -> {
399             previousValue.set(mv);
400             V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
401             MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
402             if (mv == null || newValue.isNewerThan(mv)) {
403                 updated.set(true);
404                 return newValue;
405             } else {
406                 return mv;
407             }
408         });
409         if (updated.get()) {
410             notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
411             EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
412             V value = computedValue.isTombstone()
413                     ? previousValue.get() == null ? null : previousValue.get().get()
414                     : computedValue.get();
415             if (value != null) {
416                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
417             }
418         }
419         return computedValue.get();
420     }
421
422     @Override
423     public void putAll(Map<? extends K, ? extends V> m) {
424         checkState(!destroyed, destroyedMessage);
425         m.forEach(this::put);
426     }
427
428     @Override
429     public void clear() {
430         checkState(!destroyed, destroyedMessage);
431         Maps.filterValues(items, MapValue::isAlive)
432             .forEach((k, v) -> remove(k));
433     }
434
435     @Override
436     public Set<K> keySet() {
437         checkState(!destroyed, destroyedMessage);
438         return Maps.filterValues(items, MapValue::isAlive)
439                    .keySet();
440     }
441
442     @Override
443     public Collection<V> values() {
444         checkState(!destroyed, destroyedMessage);
445         return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
446     }
447
448     @Override
449     public Set<Map.Entry<K, V>> entrySet() {
450         checkState(!destroyed, destroyedMessage);
451         return Maps.filterValues(items, MapValue::isAlive)
452                    .entrySet()
453                    .stream()
454                    .map(e -> Pair.of(e.getKey(), e.getValue().get()))
455                    .collect(Collectors.toSet());
456     }
457
458     /**
459      * Returns true if newValue was accepted i.e. map is updated.
460      * @param key key
461      * @param newValue proposed new value
462      * @return true if update happened; false if map already contains a more recent value for the key
463      */
464     private boolean putInternal(K key, MapValue<V> newValue) {
465         checkState(!destroyed, destroyedMessage);
466         checkNotNull(key, ERROR_NULL_KEY);
467         checkNotNull(newValue, ERROR_NULL_VALUE);
468         checkState(newValue.isAlive());
469         counter.incrementCount();
470         AtomicBoolean updated = new AtomicBoolean(false);
471         items.compute(key, (k, existing) -> {
472             if (existing == null || newValue.isNewerThan(existing)) {
473                 updated.set(true);
474                 return newValue;
475             }
476             return existing;
477         });
478         if (updated.get() && persistent) {
479             persistentStore.update(key, newValue);
480         }
481         return updated.get();
482     }
483
484     @Override
485     public void addListener(EventuallyConsistentMapListener<K, V> listener) {
486         checkState(!destroyed, destroyedMessage);
487
488         listeners.add(checkNotNull(listener));
489     }
490
491     @Override
492     public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
493         checkState(!destroyed, destroyedMessage);
494
495         listeners.remove(checkNotNull(listener));
496     }
497
498     @Override
499     public void destroy() {
500         destroyed = true;
501
502         executor.shutdown();
503         backgroundExecutor.shutdown();
504         communicationExecutor.shutdown();
505
506         listeners.clear();
507
508         clusterCommunicator.removeSubscriber(updateMessageSubject);
509         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
510     }
511
512     private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
513         listeners.forEach(listener -> listener.event(event));
514     }
515
516     private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
517         queueUpdate(event, peers);
518     }
519
520     private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
521         if (peers == null) {
522             // we have no friends :(
523             return;
524         }
525         peers.forEach(node ->
526             senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
527         );
528     }
529
530     private boolean underHighLoad() {
531         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
532     }
533
534     private void sendAdvertisement() {
535         try {
536             if (underHighLoad() || destroyed) {
537                 return;
538             }
539             pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
540         } catch (Exception e) {
541             // Catch all exceptions to avoid scheduled task being suppressed.
542             log.error("Exception thrown while sending advertisement", e);
543         }
544     }
545
546     private Optional<NodeId> pickRandomActivePeer() {
547         List<NodeId> activePeers = clusterService.getNodes()
548                 .stream()
549                 .map(ControllerNode::id)
550                 .filter(id -> !localNodeId.equals(id))
551                 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
552                 .collect(Collectors.toList());
553         Collections.shuffle(activePeers);
554         return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
555     }
556
557     private void sendAdvertisementToPeer(NodeId peer) {
558         clusterCommunicator.unicast(createAdvertisement(),
559                 antiEntropyAdvertisementSubject,
560                 serializer::encode,
561                 peer)
562                 .whenComplete((result, error) -> {
563                     if (error != null) {
564                         log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
565                     }
566                 });
567     }
568
569     private AntiEntropyAdvertisement<K> createAdvertisement() {
570         return new AntiEntropyAdvertisement<K>(localNodeId,
571                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
572     }
573
574     private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
575         if (destroyed || underHighLoad()) {
576             return;
577         }
578         try {
579             log.debug("Received anti-entropy advertisement from {} for {} with {} entries in it",
580                     mapName, ad.sender(), ad.digest().size());
581             antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
582
583             if (!lightweightAntiEntropy) {
584                 // if remote ad has any entries that the local copy is missing, actively sync
585                 // TODO: Missing keys is not the way local copy can be behind.
586                 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
587                     // TODO: Send ad for missing keys and for entries that are stale
588                     sendAdvertisementToPeer(ad.sender());
589                 }
590             }
591         } catch (Exception e) {
592             log.warn("Error handling anti-entropy advertisement", e);
593         }
594     }
595
596     /**
597      * Processes anti-entropy ad from peer by taking following actions:
598      * 1. If peer has an old entry, updates peer.
599      * 2. If peer indicates an entry is removed and has a more recent
600      * timestamp than the local entry, update local state.
601      */
602     private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
603             AntiEntropyAdvertisement<K> ad) {
604         final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
605         final NodeId sender = ad.sender();
606         items.forEach((key, localValue) -> {
607             MapValue.Digest remoteValueDigest = ad.digest().get(key);
608             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
609                 // local value is more recent, push to sender
610                 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
611             }
612             if (remoteValueDigest != null
613                     && remoteValueDigest.isNewerThan(localValue.digest())
614                     && remoteValueDigest.isTombstone()) {
615                 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
616                 MapValue<V> previousValue = removeInternal(key,
617                                                            Optional.empty(),
618                                                            Optional.of(tombstone));
619                 if (previousValue != null && previousValue.isAlive()) {
620                     externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
621                 }
622             }
623         });
624         return externalEvents;
625     }
626
627     private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
628         if (destroyed) {
629             return;
630         }
631         updates.forEach(update -> {
632             final K key = update.key();
633             final MapValue<V> value = update.value();
634             if (value == null || value.isTombstone()) {
635                 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
636                 if (previousValue != null && previousValue.isAlive()) {
637                     notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
638                 }
639             } else if (putInternal(key, value)) {
640                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
641             }
642         });
643     }
644
645     // TODO pull this into the class if this gets pulled out...
646     private static final int DEFAULT_MAX_EVENTS = 1000;
647     private static final int DEFAULT_MAX_IDLE_MS = 10;
648     private static final int DEFAULT_MAX_BATCH_MS = 50;
649     private static final Timer TIMER = new Timer("onos-ecm-sender-events");
650
651     private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
652
653         private final NodeId peer;
654
655         private EventAccumulator(NodeId peer) {
656             super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
657             this.peer = peer;
658         }
659
660         @Override
661         public void processItems(List<UpdateEntry<K, V>> items) {
662             Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
663             items.forEach(item -> map.compute(item.key(), (key, existing) ->
664                     item.isNewerThan(existing) ? item : existing));
665             communicationExecutor.submit(() -> {
666                 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
667                                             updateMessageSubject,
668                                             serializer::encode,
669                                             peer)
670                                    .whenComplete((result, error) -> {
671                                        if (error != null) {
672                                            log.debug("Failed to send to {}", peer, error);
673                                        }
674                                    });
675             });
676         }
677     }
678 }