f1e0dbd45c80f7a8f0af1c06cf1f5d052606a10e
[onosfw.git] /
1 /*
2  * Copyright 2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.ecmap;
17
18 import com.google.common.collect.Collections2;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableMap;
21 import com.google.common.collect.Lists;
22 import com.google.common.collect.Maps;
23 import com.google.common.collect.Sets;
24 import org.apache.commons.lang3.tuple.Pair;
25 import org.onlab.util.AbstractAccumulator;
26 import org.onlab.util.KryoNamespace;
27 import org.onlab.util.SlidingWindowCounter;
28 import org.onosproject.cluster.ClusterService;
29 import org.onosproject.cluster.ControllerNode;
30 import org.onosproject.cluster.NodeId;
31 import org.onosproject.store.Timestamp;
32 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
33 import org.onosproject.store.cluster.messaging.MessageSubject;
34 import org.onosproject.store.impl.LogicalTimestamp;
35 import org.onosproject.store.serializers.KryoNamespaces;
36 import org.onosproject.store.serializers.KryoSerializer;
37 import org.onosproject.store.service.EventuallyConsistentMap;
38 import org.onosproject.store.service.EventuallyConsistentMapEvent;
39 import org.onosproject.store.service.EventuallyConsistentMapListener;
40 import org.onosproject.store.service.WallClockTimestamp;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.util.Collection;
45 import java.util.Collections;
46 import java.util.List;
47 import java.util.Map;
48 import java.util.Objects;
49 import java.util.Optional;
50 import java.util.Set;
51 import java.util.Timer;
52 import java.util.concurrent.ExecutorService;
53 import java.util.concurrent.Executors;
54 import java.util.concurrent.ScheduledExecutorService;
55 import java.util.concurrent.TimeUnit;
56 import java.util.concurrent.atomic.AtomicBoolean;
57 import java.util.concurrent.atomic.AtomicReference;
58 import java.util.function.BiFunction;
59 import java.util.stream.Collectors;
60
61 import static com.google.common.base.Preconditions.checkNotNull;
62 import static com.google.common.base.Preconditions.checkState;
63 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
64 import static org.onlab.util.BoundedThreadPool.newFixedThreadPool;
65 import static org.onlab.util.Tools.groupedThreads;
66 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
67 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
68
69 /**
70  * Distributed Map implementation which uses optimistic replication and gossip
71  * based techniques to provide an eventually consistent data store.
72  */
73 public class EventuallyConsistentMapImpl<K, V>
74         implements EventuallyConsistentMap<K, V> {
75
76     private static final Logger log = LoggerFactory.getLogger(EventuallyConsistentMapImpl.class);
77
78     private final Map<K, MapValue<V>> items;
79
80     private final ClusterService clusterService;
81     private final ClusterCommunicationService clusterCommunicator;
82     private final KryoSerializer serializer;
83     private final NodeId localNodeId;
84
85     private final BiFunction<K, V, Timestamp> timestampProvider;
86
87     private final MessageSubject updateMessageSubject;
88     private final MessageSubject antiEntropyAdvertisementSubject;
89
90     private final Set<EventuallyConsistentMapListener<K, V>> listeners
91             = Sets.newCopyOnWriteArraySet();
92
93     private final ExecutorService executor;
94     private final ScheduledExecutorService backgroundExecutor;
95     private final BiFunction<K, V, Collection<NodeId>> peerUpdateFunction;
96
97     private final ExecutorService communicationExecutor;
98     private final Map<NodeId, EventAccumulator> senderPending;
99
100     private final String mapName;
101
102     private volatile boolean destroyed = false;
103     private static final String ERROR_DESTROYED = " map is already destroyed";
104     private final String destroyedMessage;
105
106     private static final String ERROR_NULL_KEY = "Key cannot be null";
107     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
108
109     private final long initialDelaySec = 5;
110     private final boolean lightweightAntiEntropy;
111     private final boolean tombstonesDisabled;
112
113     private static final int WINDOW_SIZE = 5;
114     private static final int HIGH_LOAD_THRESHOLD = 0;
115     private static final int LOAD_WINDOW = 2;
116     private SlidingWindowCounter counter = new SlidingWindowCounter(WINDOW_SIZE);
117
118     private final boolean persistent;
119     private final PersistentStore<K, V> persistentStore;
120
121     /**
122      * Creates a new eventually consistent map shared amongst multiple instances.
123      * <p>
124      * See {@link org.onosproject.store.service.EventuallyConsistentMapBuilder}
125      * for more description of the parameters expected by the map.
126      * </p>
127      *
128      * @param mapName               a String identifier for the map.
129      * @param clusterService        the cluster service
130      * @param clusterCommunicator   the cluster communications service
131      * @param serializerBuilder     a Kryo namespace builder that can serialize
132      *                              both K and V
133      * @param timestampProvider     provider of timestamps for K and V
134      * @param peerUpdateFunction    function that provides a set of nodes to immediately
135      *                              update to when there writes to the map
136      * @param eventExecutor         executor to use for processing incoming
137      *                              events from peers
138      * @param communicationExecutor executor to use for sending events to peers
139      * @param backgroundExecutor    executor to use for background anti-entropy
140      *                              tasks
141      * @param tombstonesDisabled    true if this map should not maintain
142      *                              tombstones
143      * @param antiEntropyPeriod     period that the anti-entropy task should run
144      * @param antiEntropyTimeUnit   time unit for anti-entropy period
145      * @param convergeFaster        make anti-entropy try to converge faster
146      * @param persistent            persist data to disk
147      */
148     EventuallyConsistentMapImpl(String mapName,
149                                 ClusterService clusterService,
150                                 ClusterCommunicationService clusterCommunicator,
151                                 KryoNamespace.Builder serializerBuilder,
152                                 BiFunction<K, V, Timestamp> timestampProvider,
153                                 BiFunction<K, V, Collection<NodeId>> peerUpdateFunction,
154                                 ExecutorService eventExecutor,
155                                 ExecutorService communicationExecutor,
156                                 ScheduledExecutorService backgroundExecutor,
157                                 boolean tombstonesDisabled,
158                                 long antiEntropyPeriod,
159                                 TimeUnit antiEntropyTimeUnit,
160                                 boolean convergeFaster,
161                                 boolean persistent) {
162         this.mapName = mapName;
163         items = Maps.newConcurrentMap();
164         senderPending = Maps.newConcurrentMap();
165         destroyedMessage = mapName + ERROR_DESTROYED;
166
167         this.clusterService = clusterService;
168         this.clusterCommunicator = clusterCommunicator;
169         this.localNodeId = clusterService.getLocalNode().id();
170
171         this.serializer = createSerializer(serializerBuilder);
172
173         this.timestampProvider = timestampProvider;
174
175         if (peerUpdateFunction != null) {
176             this.peerUpdateFunction = peerUpdateFunction;
177         } else {
178             this.peerUpdateFunction = (key, value) -> clusterService.getNodes().stream()
179                     .map(ControllerNode::id)
180                     .filter(nodeId -> !nodeId.equals(localNodeId))
181                     .collect(Collectors.toList());
182         }
183
184         if (eventExecutor != null) {
185             this.executor = eventExecutor;
186         } else {
187             // should be a normal executor; it's used for receiving messages
188             this.executor =
189                     Executors.newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-fg-%d"));
190         }
191
192         if (communicationExecutor != null) {
193             this.communicationExecutor = communicationExecutor;
194         } else {
195             // sending executor; should be capped
196             //TODO this probably doesn't need to be bounded anymore
197             this.communicationExecutor =
198                     newFixedThreadPool(8, groupedThreads("onos/ecm", mapName + "-publish-%d"));
199         }
200
201         this.persistent = persistent;
202
203         if (this.persistent) {
204             String dataDirectory = System.getProperty("karaf.data", "./data");
205             String filename = dataDirectory + "/" + "mapdb-ecm-" + mapName;
206
207             ExecutorService dbExecutor =
208                     newFixedThreadPool(1, groupedThreads("onos/ecm", mapName + "-dbwriter"));
209
210             persistentStore = new MapDbPersistentStore<>(filename, dbExecutor, serializer);
211             persistentStore.readInto(items);
212         } else {
213             this.persistentStore = null;
214         }
215
216         if (backgroundExecutor != null) {
217             this.backgroundExecutor = backgroundExecutor;
218         } else {
219             this.backgroundExecutor =
220                     newSingleThreadScheduledExecutor(groupedThreads("onos/ecm", mapName + "-bg-%d"));
221         }
222
223         // start anti-entropy thread
224         this.backgroundExecutor.scheduleAtFixedRate(this::sendAdvertisement,
225                                                     initialDelaySec, antiEntropyPeriod,
226                                                     antiEntropyTimeUnit);
227
228         updateMessageSubject = new MessageSubject("ecm-" + mapName + "-update");
229         clusterCommunicator.addSubscriber(updateMessageSubject,
230                                           serializer::decode,
231                                           this::processUpdates,
232                                           this.executor);
233
234         antiEntropyAdvertisementSubject = new MessageSubject("ecm-" + mapName + "-anti-entropy");
235         clusterCommunicator.addSubscriber(antiEntropyAdvertisementSubject,
236                                           serializer::decode,
237                                           this::handleAntiEntropyAdvertisement,
238                                           this.backgroundExecutor);
239
240         this.tombstonesDisabled = tombstonesDisabled;
241         this.lightweightAntiEntropy = !convergeFaster;
242     }
243
244     private KryoSerializer createSerializer(KryoNamespace.Builder builder) {
245         return new KryoSerializer() {
246             @Override
247             protected void setupKryoPool() {
248                 // Add the map's internal helper classes to the user-supplied serializer
249                 serializerPool = builder
250                         .register(KryoNamespaces.BASIC)
251                         .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
252                         .register(LogicalTimestamp.class)
253                         .register(WallClockTimestamp.class)
254                         .register(AntiEntropyAdvertisement.class)
255                         .register(UpdateEntry.class)
256                         .register(MapValue.class)
257                         .register(MapValue.Digest.class)
258                         .build();
259             }
260         };
261     }
262
263     @Override
264     public int size() {
265         checkState(!destroyed, destroyedMessage);
266         // TODO: Maintain a separate counter for tracking live elements in map.
267         return Maps.filterValues(items, MapValue::isAlive).size();
268     }
269
270     @Override
271     public boolean isEmpty() {
272         checkState(!destroyed, destroyedMessage);
273         return size() == 0;
274     }
275
276     @Override
277     public boolean containsKey(K key) {
278         checkState(!destroyed, destroyedMessage);
279         checkNotNull(key, ERROR_NULL_KEY);
280         return get(key) != null;
281     }
282
283     @Override
284     public boolean containsValue(V value) {
285         checkState(!destroyed, destroyedMessage);
286         checkNotNull(value, ERROR_NULL_VALUE);
287         return items.values()
288                     .stream()
289                     .filter(MapValue::isAlive)
290                     .anyMatch(v -> value.equals(v.get()));
291     }
292
293     @Override
294     public V get(K key) {
295         checkState(!destroyed, destroyedMessage);
296         checkNotNull(key, ERROR_NULL_KEY);
297
298         MapValue<V> value = items.get(key);
299         return (value == null || value.isTombstone()) ? null : value.get();
300     }
301
302     @Override
303     public void put(K key, V value) {
304         checkState(!destroyed, destroyedMessage);
305         checkNotNull(key, ERROR_NULL_KEY);
306         checkNotNull(value, ERROR_NULL_VALUE);
307
308         MapValue<V> newValue = new MapValue<>(value, timestampProvider.apply(key, value));
309         if (putInternal(key, newValue)) {
310             notifyPeers(new UpdateEntry<>(key, newValue), peerUpdateFunction.apply(key, value));
311             notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value));
312         }
313     }
314
315     @Override
316     public V remove(K key) {
317         checkState(!destroyed, destroyedMessage);
318         checkNotNull(key, ERROR_NULL_KEY);
319         return removeAndNotify(key, null);
320     }
321
322     @Override
323     public void remove(K key, V value) {
324         checkState(!destroyed, destroyedMessage);
325         checkNotNull(key, ERROR_NULL_KEY);
326         checkNotNull(value, ERROR_NULL_VALUE);
327         removeAndNotify(key, value);
328     }
329
330     private V removeAndNotify(K key, V value) {
331         Timestamp timestamp = timestampProvider.apply(key, value);
332         Optional<MapValue<V>> tombstone = tombstonesDisabled || timestamp == null
333                 ? Optional.empty() : Optional.of(MapValue.tombstone(timestamp));
334         MapValue<V> previousValue = removeInternal(key, Optional.ofNullable(value), tombstone);
335         if (previousValue != null) {
336             notifyPeers(new UpdateEntry<>(key, tombstone.orElse(null)),
337                         peerUpdateFunction.apply(key, previousValue.get()));
338             if (previousValue.isAlive()) {
339                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
340             }
341         }
342         return previousValue != null ? previousValue.get() : null;
343     }
344
345     private MapValue<V> removeInternal(K key, Optional<V> value, Optional<MapValue<V>> tombstone) {
346         checkState(!destroyed, destroyedMessage);
347         checkNotNull(key, ERROR_NULL_KEY);
348         checkNotNull(value, ERROR_NULL_VALUE);
349         tombstone.ifPresent(v -> checkState(v.isTombstone()));
350
351         counter.incrementCount();
352         AtomicBoolean updated = new AtomicBoolean(false);
353         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
354         items.compute(key, (k, existing) -> {
355             boolean valueMatches = true;
356             if (value.isPresent() && existing != null && existing.isAlive()) {
357                 valueMatches = Objects.equals(value.get(), existing.get());
358             }
359             if (existing == null) {
360                 log.trace("ECMap Remove: Existing value for key {} is already null", k);
361             }
362             if (valueMatches) {
363                 if (existing == null) {
364                     updated.set(tombstone.isPresent());
365                 } else {
366                     updated.set(!tombstone.isPresent() || tombstone.get().isNewerThan(existing));
367                 }
368             }
369             if (updated.get()) {
370                 previousValue.set(existing);
371                 return tombstone.orElse(null);
372             } else {
373                 return existing;
374             }
375         });
376         if (updated.get()) {
377             if (persistent) {
378                 if (tombstone.isPresent()) {
379                     persistentStore.update(key, tombstone.get());
380                 } else {
381                     persistentStore.remove(key);
382                 }
383             }
384         }
385         return previousValue.get();
386     }
387
388     @Override
389     public V compute(K key, BiFunction<K, V, V> recomputeFunction) {
390         checkState(!destroyed, destroyedMessage);
391         checkNotNull(key, ERROR_NULL_KEY);
392         checkNotNull(recomputeFunction, "Recompute function cannot be null");
393
394         AtomicBoolean updated = new AtomicBoolean(false);
395         AtomicReference<MapValue<V>> previousValue = new AtomicReference<>();
396         MapValue<V> computedValue = items.compute(key, (k, mv) -> {
397             previousValue.set(mv);
398             V newRawValue = recomputeFunction.apply(key, mv == null ? null : mv.get());
399             MapValue<V> newValue = new MapValue<>(newRawValue, timestampProvider.apply(key, newRawValue));
400             if (mv == null || newValue.isNewerThan(mv)) {
401                 updated.set(true);
402                 return newValue;
403             } else {
404                 return mv;
405             }
406         });
407         if (updated.get()) {
408             notifyPeers(new UpdateEntry<>(key, computedValue), peerUpdateFunction.apply(key, computedValue.get()));
409             EventuallyConsistentMapEvent.Type updateType = computedValue.isTombstone() ? REMOVE : PUT;
410             V value = computedValue.isTombstone()
411                     ? previousValue.get() == null ? null : previousValue.get().get()
412                     : computedValue.get();
413             if (value != null) {
414                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, updateType, key, value));
415             }
416         }
417         return computedValue.get();
418     }
419
420     @Override
421     public void putAll(Map<? extends K, ? extends V> m) {
422         checkState(!destroyed, destroyedMessage);
423         m.forEach(this::put);
424     }
425
426     @Override
427     public void clear() {
428         checkState(!destroyed, destroyedMessage);
429         Maps.filterValues(items, MapValue::isAlive)
430             .forEach((k, v) -> remove(k));
431     }
432
433     @Override
434     public Set<K> keySet() {
435         checkState(!destroyed, destroyedMessage);
436         return Maps.filterValues(items, MapValue::isAlive)
437                    .keySet();
438     }
439
440     @Override
441     public Collection<V> values() {
442         checkState(!destroyed, destroyedMessage);
443         return Collections2.transform(Maps.filterValues(items, MapValue::isAlive).values(), MapValue::get);
444     }
445
446     @Override
447     public Set<Map.Entry<K, V>> entrySet() {
448         checkState(!destroyed, destroyedMessage);
449         return Maps.filterValues(items, MapValue::isAlive)
450                    .entrySet()
451                    .stream()
452                    .map(e -> Pair.of(e.getKey(), e.getValue().get()))
453                    .collect(Collectors.toSet());
454     }
455
456     /**
457      * Returns true if newValue was accepted i.e. map is updated.
458      * @param key key
459      * @param newValue proposed new value
460      * @return true if update happened; false if map already contains a more recent value for the key
461      */
462     private boolean putInternal(K key, MapValue<V> newValue) {
463         checkState(!destroyed, destroyedMessage);
464         checkNotNull(key, ERROR_NULL_KEY);
465         checkNotNull(newValue, ERROR_NULL_VALUE);
466         checkState(newValue.isAlive());
467         counter.incrementCount();
468         AtomicBoolean updated = new AtomicBoolean(false);
469         items.compute(key, (k, existing) -> {
470             if (existing == null || newValue.isNewerThan(existing)) {
471                 updated.set(true);
472                 return newValue;
473             }
474             return existing;
475         });
476         if (updated.get() && persistent) {
477             persistentStore.update(key, newValue);
478         }
479         return updated.get();
480     }
481
482     @Override
483     public void addListener(EventuallyConsistentMapListener<K, V> listener) {
484         checkState(!destroyed, destroyedMessage);
485
486         listeners.add(checkNotNull(listener));
487     }
488
489     @Override
490     public void removeListener(EventuallyConsistentMapListener<K, V> listener) {
491         checkState(!destroyed, destroyedMessage);
492
493         listeners.remove(checkNotNull(listener));
494     }
495
496     @Override
497     public void destroy() {
498         destroyed = true;
499
500         executor.shutdown();
501         backgroundExecutor.shutdown();
502         communicationExecutor.shutdown();
503
504         listeners.clear();
505
506         clusterCommunicator.removeSubscriber(updateMessageSubject);
507         clusterCommunicator.removeSubscriber(antiEntropyAdvertisementSubject);
508     }
509
510     private void notifyListeners(EventuallyConsistentMapEvent<K, V> event) {
511         listeners.forEach(listener -> listener.event(event));
512     }
513
514     private void notifyPeers(UpdateEntry<K, V> event, Collection<NodeId> peers) {
515         queueUpdate(event, peers);
516     }
517
518     private void queueUpdate(UpdateEntry<K, V> event, Collection<NodeId> peers) {
519         if (peers == null) {
520             // we have no friends :(
521             return;
522         }
523         peers.forEach(node ->
524                         senderPending.computeIfAbsent(node, unusedKey -> new EventAccumulator(node)).add(event)
525         );
526     }
527
528     private boolean underHighLoad() {
529         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
530     }
531
532     private void sendAdvertisement() {
533         try {
534             if (underHighLoad() || destroyed) {
535                 return;
536             }
537             pickRandomActivePeer().ifPresent(this::sendAdvertisementToPeer);
538         } catch (Exception e) {
539             // Catch all exceptions to avoid scheduled task being suppressed.
540             log.error("Exception thrown while sending advertisement", e);
541         }
542     }
543
544     private Optional<NodeId> pickRandomActivePeer() {
545         List<NodeId> activePeers = clusterService.getNodes()
546                 .stream()
547                 .map(ControllerNode::id)
548                 .filter(id -> !localNodeId.equals(id))
549                 .filter(id -> clusterService.getState(id) == ControllerNode.State.ACTIVE)
550                 .collect(Collectors.toList());
551         Collections.shuffle(activePeers);
552         return activePeers.isEmpty() ? Optional.empty() : Optional.of(activePeers.get(0));
553     }
554
555     private void sendAdvertisementToPeer(NodeId peer) {
556         clusterCommunicator.unicast(createAdvertisement(),
557                 antiEntropyAdvertisementSubject,
558                 serializer::encode,
559                 peer)
560                 .whenComplete((result, error) -> {
561                     if (error != null) {
562                         log.debug("Failed to send anti-entropy advertisement to {}", peer, error);
563                     }
564                 });
565     }
566
567     private AntiEntropyAdvertisement<K> createAdvertisement() {
568         return new AntiEntropyAdvertisement<K>(localNodeId,
569                 ImmutableMap.copyOf(Maps.transformValues(items, MapValue::digest)));
570     }
571
572     private void handleAntiEntropyAdvertisement(AntiEntropyAdvertisement<K> ad) {
573         if (destroyed || underHighLoad()) {
574             return;
575         }
576         try {
577             if (log.isTraceEnabled()) {
578                 log.trace("Received anti-entropy advertisement from {} for {} with {} entries in it",
579                         mapName, ad.sender(), ad.digest().size());
580             }
581             antiEntropyCheckLocalItems(ad).forEach(this::notifyListeners);
582
583             if (!lightweightAntiEntropy) {
584                 // if remote ad has any entries that the local copy is missing, actively sync
585                 // TODO: Missing keys is not the way local copy can be behind.
586                 if (Sets.difference(ad.digest().keySet(), items.keySet()).size() > 0) {
587                     // TODO: Send ad for missing keys and for entries that are stale
588                     sendAdvertisementToPeer(ad.sender());
589                 }
590             }
591         } catch (Exception e) {
592             log.warn("Error handling anti-entropy advertisement", e);
593         }
594     }
595
596     /**
597      * Processes anti-entropy ad from peer by taking following actions:
598      * 1. If peer has an old entry, updates peer.
599      * 2. If peer indicates an entry is removed and has a more recent
600      * timestamp than the local entry, update local state.
601      */
602     private List<EventuallyConsistentMapEvent<K, V>> antiEntropyCheckLocalItems(
603             AntiEntropyAdvertisement<K> ad) {
604         final List<EventuallyConsistentMapEvent<K, V>> externalEvents = Lists.newLinkedList();
605         final NodeId sender = ad.sender();
606         items.forEach((key, localValue) -> {
607             MapValue.Digest remoteValueDigest = ad.digest().get(key);
608             if (remoteValueDigest == null || localValue.isNewerThan(remoteValueDigest.timestamp())) {
609                 // local value is more recent, push to sender
610                 queueUpdate(new UpdateEntry<>(key, localValue), ImmutableList.of(sender));
611             }
612             if (remoteValueDigest != null
613                     && remoteValueDigest.isNewerThan(localValue.digest())
614                     && remoteValueDigest.isTombstone()) {
615                 MapValue<V> tombstone = MapValue.tombstone(remoteValueDigest.timestamp());
616                 MapValue<V> previousValue = removeInternal(key,
617                                                            Optional.empty(),
618                                                            Optional.of(tombstone));
619                 if (previousValue != null && previousValue.isAlive()) {
620                     externalEvents.add(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
621                 }
622             }
623         });
624         return externalEvents;
625     }
626
627     private void processUpdates(Collection<UpdateEntry<K, V>> updates) {
628         if (destroyed) {
629             return;
630         }
631         updates.forEach(update -> {
632             final K key = update.key();
633             final MapValue<V> value = update.value();
634             if (value == null || value.isTombstone()) {
635                 MapValue<V> previousValue = removeInternal(key, Optional.empty(), Optional.ofNullable(value));
636                 if (previousValue != null && previousValue.isAlive()) {
637                     notifyListeners(new EventuallyConsistentMapEvent<>(mapName, REMOVE, key, previousValue.get()));
638                 }
639             } else if (putInternal(key, value)) {
640                 notifyListeners(new EventuallyConsistentMapEvent<>(mapName, PUT, key, value.get()));
641             }
642         });
643     }
644
645     // TODO pull this into the class if this gets pulled out...
646     private static final int DEFAULT_MAX_EVENTS = 1000;
647     private static final int DEFAULT_MAX_IDLE_MS = 10;
648     private static final int DEFAULT_MAX_BATCH_MS = 50;
649     private static final Timer TIMER = new Timer("onos-ecm-sender-events");
650
651     private final class EventAccumulator extends AbstractAccumulator<UpdateEntry<K, V>> {
652
653         private final NodeId peer;
654
655         private EventAccumulator(NodeId peer) {
656             super(TIMER, DEFAULT_MAX_EVENTS, DEFAULT_MAX_BATCH_MS, DEFAULT_MAX_IDLE_MS);
657             this.peer = peer;
658         }
659
660         @Override
661         public void processItems(List<UpdateEntry<K, V>> items) {
662             Map<K, UpdateEntry<K, V>> map = Maps.newHashMap();
663             items.forEach(item -> map.compute(item.key(), (key, existing) ->
664                     item.isNewerThan(existing) ? item : existing));
665             communicationExecutor.submit(() -> {
666                 clusterCommunicator.unicast(ImmutableList.copyOf(map.values()),
667                                             updateMessageSubject,
668                                             serializer::encode,
669                                             peer)
670                                    .whenComplete((result, error) -> {
671                                        if (error != null) {
672                                            log.debug("Failed to send to {}", peer, error);
673                                        }
674                                    });
675             });
676         }
677     }
678 }