767ede543ec95d519fbd1726d25f926c885a7dae
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.link.impl;
17
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Multimaps;
22 import com.google.common.collect.SetMultimap;
23 import com.google.common.collect.Sets;
24 import org.apache.commons.lang3.RandomUtils;
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service;
31 import org.onlab.util.KryoNamespace;
32 import org.onosproject.cluster.ClusterService;
33 import org.onosproject.cluster.ControllerNode;
34 import org.onosproject.cluster.NodeId;
35 import org.onosproject.mastership.MastershipService;
36 import org.onosproject.net.AnnotationKeys;
37 import org.onosproject.net.AnnotationsUtil;
38 import org.onosproject.net.ConnectPoint;
39 import org.onosproject.net.DefaultAnnotations;
40 import org.onosproject.net.DefaultLink;
41 import org.onosproject.net.DeviceId;
42 import org.onosproject.net.Link;
43 import org.onosproject.net.Link.Type;
44 import org.onosproject.net.LinkKey;
45 import org.onosproject.net.SparseAnnotations;
46 import org.onosproject.net.device.DeviceClockService;
47 import org.onosproject.net.link.DefaultLinkDescription;
48 import org.onosproject.net.link.LinkDescription;
49 import org.onosproject.net.link.LinkEvent;
50 import org.onosproject.net.link.LinkStore;
51 import org.onosproject.net.link.LinkStoreDelegate;
52 import org.onosproject.net.provider.ProviderId;
53 import org.onosproject.store.AbstractStore;
54 import org.onosproject.store.Timestamp;
55 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
56 import org.onosproject.store.cluster.messaging.ClusterMessage;
57 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
58 import org.onosproject.store.cluster.messaging.MessageSubject;
59 import org.onosproject.store.impl.Timestamped;
60 import org.onosproject.store.serializers.KryoSerializer;
61 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
62 import org.slf4j.Logger;
63
64 import java.io.IOException;
65 import java.util.Collection;
66 import java.util.Collections;
67 import java.util.HashMap;
68 import java.util.HashSet;
69 import java.util.Map;
70 import java.util.Map.Entry;
71 import java.util.Objects;
72 import java.util.Set;
73 import java.util.concurrent.ConcurrentHashMap;
74 import java.util.concurrent.ConcurrentMap;
75 import java.util.concurrent.ExecutorService;
76 import java.util.concurrent.Executors;
77 import java.util.concurrent.ScheduledExecutorService;
78 import java.util.concurrent.TimeUnit;
79
80 import static com.google.common.base.Preconditions.checkNotNull;
81 import static com.google.common.base.Predicates.notNull;
82 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
83 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
84 import static org.onlab.util.Tools.groupedThreads;
85 import static org.onlab.util.Tools.minPriority;
86 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
87 import static org.onosproject.net.DefaultAnnotations.merge;
88 import static org.onosproject.net.DefaultAnnotations.union;
89 import static org.onosproject.net.Link.State.ACTIVE;
90 import static org.onosproject.net.Link.State.INACTIVE;
91 import static org.onosproject.net.Link.Type.DIRECT;
92 import static org.onosproject.net.Link.Type.INDIRECT;
93 import static org.onosproject.net.LinkKey.linkKey;
94 import static org.onosproject.net.link.LinkEvent.Type.*;
95 import static org.onosproject.store.link.impl.GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT;
96 import static org.slf4j.LoggerFactory.getLogger;
97
98 /**
99  * Manages inventory of infrastructure links in distributed data store
100  * that uses optimistic replication and gossip based techniques.
101  */
102 @Component(immediate = true, enabled = false)
103 @Service
104 public class GossipLinkStore
105         extends AbstractStore<LinkEvent, LinkStoreDelegate>
106         implements LinkStore {
107
108     // Timeout in milliseconds to process links on remote master node
109     private static final int REMOTE_MASTER_TIMEOUT = 1000;
110
111     private final Logger log = getLogger(getClass());
112
113     // Link inventory
114     private final ConcurrentMap<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>> linkDescs =
115         new ConcurrentHashMap<>();
116
117     // Link instance cache
118     private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
119
120     // Egress and ingress link sets
121     private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
122     private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
123
124     // Remove links
125     private final Map<LinkKey, Timestamp> removedLinks = new ConcurrentHashMap<>();
126
127     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
128     protected DeviceClockService deviceClockService;
129
130     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
131     protected ClusterCommunicationService clusterCommunicator;
132
133     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
134     protected ClusterService clusterService;
135
136     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
137     protected MastershipService mastershipService;
138
139     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
140         @Override
141         protected void setupKryoPool() {
142             serializerPool = KryoNamespace.newBuilder()
143                     .register(DistributedStoreSerializers.STORE_COMMON)
144                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
145                     .register(InternalLinkEvent.class)
146                     .register(InternalLinkRemovedEvent.class)
147                     .register(LinkAntiEntropyAdvertisement.class)
148                     .register(LinkFragmentId.class)
149                     .register(LinkInjectedEvent.class)
150                     .build();
151         }
152     };
153
154     private ExecutorService executor;
155
156     private ScheduledExecutorService backgroundExecutors;
157
158     @Activate
159     public void activate() {
160
161         executor = Executors.newCachedThreadPool(groupedThreads("onos/link", "fg-%d"));
162
163         backgroundExecutors =
164                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/link", "bg-%d")));
165
166         clusterCommunicator.addSubscriber(
167                 GossipLinkStoreMessageSubjects.LINK_UPDATE,
168                 new InternalLinkEventListener(), executor);
169         clusterCommunicator.addSubscriber(
170                 GossipLinkStoreMessageSubjects.LINK_REMOVED,
171                 new InternalLinkRemovedEventListener(), executor);
172         clusterCommunicator.addSubscriber(
173                 GossipLinkStoreMessageSubjects.LINK_ANTI_ENTROPY_ADVERTISEMENT,
174                 new InternalLinkAntiEntropyAdvertisementListener(), backgroundExecutors);
175         clusterCommunicator.addSubscriber(
176                 GossipLinkStoreMessageSubjects.LINK_INJECTED,
177                 new LinkInjectedEventListener(), executor);
178
179         long initialDelaySec = 5;
180         long periodSec = 5;
181         // start anti-entropy thread
182         backgroundExecutors.scheduleAtFixedRate(new SendAdvertisementTask(),
183                     initialDelaySec, periodSec, TimeUnit.SECONDS);
184
185         log.info("Started");
186     }
187
188     @Deactivate
189     public void deactivate() {
190
191         executor.shutdownNow();
192
193         backgroundExecutors.shutdownNow();
194         try {
195             if (!backgroundExecutors.awaitTermination(5, TimeUnit.SECONDS)) {
196                 log.error("Timeout during executor shutdown");
197             }
198         } catch (InterruptedException e) {
199             log.error("Error during executor shutdown", e);
200         }
201
202         linkDescs.clear();
203         links.clear();
204         srcLinks.clear();
205         dstLinks.clear();
206         log.info("Stopped");
207     }
208
209     @Override
210     public int getLinkCount() {
211         return links.size();
212     }
213
214     @Override
215     public Iterable<Link> getLinks() {
216         return Collections.unmodifiableCollection(links.values());
217     }
218
219     @Override
220     public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
221         // lock for iteration
222         synchronized (srcLinks) {
223             return FluentIterable.from(srcLinks.get(deviceId))
224             .transform(lookupLink())
225             .filter(notNull())
226             .toSet();
227         }
228     }
229
230     @Override
231     public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
232         // lock for iteration
233         synchronized (dstLinks) {
234             return FluentIterable.from(dstLinks.get(deviceId))
235             .transform(lookupLink())
236             .filter(notNull())
237             .toSet();
238         }
239     }
240
241     @Override
242     public Link getLink(ConnectPoint src, ConnectPoint dst) {
243         return links.get(linkKey(src, dst));
244     }
245
246     @Override
247     public Set<Link> getEgressLinks(ConnectPoint src) {
248         Set<Link> egress = new HashSet<>();
249         //
250         // Change `srcLinks` to ConcurrentMap<DeviceId, (Concurrent)Set>
251         // to remove this synchronized block, if we hit performance issue.
252         // SetMultiMap#get returns wrapped collection to provide modifiable-view.
253         // And the wrapped collection is not concurrent access safe.
254         //
255         // Our use case here does not require returned collection to be modifiable,
256         // so the wrapped collection forces us to lock the whole multiset,
257         // for benefit we don't need.
258         //
259         // Same applies to `dstLinks`
260         synchronized (srcLinks) {
261             for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
262                 if (linkKey.src().equals(src)) {
263                     Link link = links.get(linkKey);
264                     if (link != null) {
265                         egress.add(link);
266                     } else {
267                         log.debug("Egress link for {} was null, skipped", linkKey);
268                     }
269                 }
270             }
271         }
272         return egress;
273     }
274
275     @Override
276     public Set<Link> getIngressLinks(ConnectPoint dst) {
277         Set<Link> ingress = new HashSet<>();
278         synchronized (dstLinks) {
279             for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
280                 if (linkKey.dst().equals(dst)) {
281                     Link link = links.get(linkKey);
282                     if (link != null) {
283                         ingress.add(link);
284                     } else {
285                         log.debug("Ingress link for {} was null, skipped", linkKey);
286                     }
287                 }
288             }
289         }
290         return ingress;
291     }
292
293     @Override
294     public LinkEvent createOrUpdateLink(ProviderId providerId,
295                                         LinkDescription linkDescription) {
296
297         final DeviceId dstDeviceId = linkDescription.dst().deviceId();
298         final NodeId localNode = clusterService.getLocalNode().id();
299         final NodeId dstNode = mastershipService.getMasterFor(dstDeviceId);
300
301         // Process link update only if we're the master of the destination node,
302         // otherwise signal the actual master.
303         LinkEvent linkEvent = null;
304         if (localNode.equals(dstNode)) {
305
306             Timestamp newTimestamp = deviceClockService.getTimestamp(dstDeviceId);
307
308             final Timestamped<LinkDescription> deltaDesc = new Timestamped<>(linkDescription, newTimestamp);
309
310             LinkKey key = linkKey(linkDescription.src(), linkDescription.dst());
311             final Timestamped<LinkDescription> mergedDesc;
312             Map<ProviderId, Timestamped<LinkDescription>> map = getOrCreateLinkDescriptions(key);
313
314             synchronized (map) {
315                 linkEvent = createOrUpdateLinkInternal(providerId, deltaDesc);
316                 mergedDesc = map.get(providerId);
317             }
318
319             if (linkEvent != null) {
320                 log.debug("Notifying peers of a link update topology event from providerId: "
321                                 + "{}  between src: {} and dst: {}",
322                         providerId, linkDescription.src(), linkDescription.dst());
323                 notifyPeers(new InternalLinkEvent(providerId, mergedDesc));
324             }
325
326         } else {
327             // FIXME Temporary hack for NPE (ONOS-1171).
328             // Proper fix is to implement forwarding to master on ConfigProvider
329             // redo ONOS-490
330             if (dstNode == null) {
331                 // silently ignore
332                 return null;
333             }
334
335
336             LinkInjectedEvent linkInjectedEvent = new LinkInjectedEvent(providerId, linkDescription);
337
338             // TODO check unicast return value
339             clusterCommunicator.unicast(linkInjectedEvent,
340                     GossipLinkStoreMessageSubjects.LINK_INJECTED,
341                     SERIALIZER::encode,
342                     dstNode);
343         }
344
345         return linkEvent;
346     }
347
348     @Override
349     public LinkEvent removeOrDownLink(ConnectPoint src, ConnectPoint dst) {
350         Link link = getLink(src, dst);
351         if (link == null) {
352             return null;
353         }
354
355         if (link.isDurable()) {
356             // FIXME: this is not the right thing to call for the gossip store; will not sync link state!!!
357             return link.state() == INACTIVE ? null :
358                     updateLink(linkKey(link.src(), link.dst()), link,
359                                new DefaultLink(link.providerId(),
360                                                link.src(), link.dst(),
361                                                link.type(), INACTIVE,
362                                                link.isDurable(),
363                                                link.annotations()));
364         }
365         return removeLink(src, dst);
366     }
367
368     private LinkEvent createOrUpdateLinkInternal(
369             ProviderId providerId,
370             Timestamped<LinkDescription> linkDescription) {
371
372         final LinkKey key = linkKey(linkDescription.value().src(),
373                 linkDescription.value().dst());
374         Map<ProviderId, Timestamped<LinkDescription>> descs = getOrCreateLinkDescriptions(key);
375
376         synchronized (descs) {
377             // if the link was previously removed, we should proceed if and
378             // only if this request is more recent.
379             Timestamp linkRemovedTimestamp = removedLinks.get(key);
380             if (linkRemovedTimestamp != null) {
381                 if (linkDescription.isNewerThan(linkRemovedTimestamp)) {
382                     removedLinks.remove(key);
383                 } else {
384                     log.trace("Link {} was already removed ignoring.", key);
385                     return null;
386                 }
387             }
388
389             final Link oldLink = links.get(key);
390             // update description
391             createOrUpdateLinkDescription(descs, providerId, linkDescription);
392             final Link newLink = composeLink(descs);
393             if (oldLink == null) {
394                 return createLink(key, newLink);
395             }
396             return updateLink(key, oldLink, newLink);
397         }
398     }
399
400     // Guarded by linkDescs value (=locking each Link)
401     private Timestamped<LinkDescription> createOrUpdateLinkDescription(
402             Map<ProviderId, Timestamped<LinkDescription>> descs,
403             ProviderId providerId,
404             Timestamped<LinkDescription> linkDescription) {
405
406         // merge existing annotations
407         Timestamped<LinkDescription> existingLinkDescription = descs.get(providerId);
408         if (existingLinkDescription != null && existingLinkDescription.isNewer(linkDescription)) {
409             log.trace("local info is more up-to-date, ignoring {}.", linkDescription);
410             return null;
411         }
412         Timestamped<LinkDescription> newLinkDescription = linkDescription;
413         if (existingLinkDescription != null) {
414             // we only allow transition from INDIRECT -> DIRECT
415             final Type newType;
416             if (existingLinkDescription.value().type() == DIRECT) {
417                 newType = DIRECT;
418             } else {
419                 newType = linkDescription.value().type();
420             }
421             SparseAnnotations merged = union(existingLinkDescription.value().annotations(),
422                     linkDescription.value().annotations());
423             newLinkDescription = new Timestamped<>(
424                     new DefaultLinkDescription(
425                         linkDescription.value().src(),
426                         linkDescription.value().dst(),
427                         newType, merged),
428                     linkDescription.timestamp());
429         }
430         return descs.put(providerId, newLinkDescription);
431     }
432
433     // Creates and stores the link and returns the appropriate event.
434     // Guarded by linkDescs value (=locking each Link)
435     private LinkEvent createLink(LinkKey key, Link newLink) {
436         links.put(key, newLink);
437         srcLinks.put(newLink.src().deviceId(), key);
438         dstLinks.put(newLink.dst().deviceId(), key);
439         return new LinkEvent(LINK_ADDED, newLink);
440     }
441
442     // Updates, if necessary the specified link and returns the appropriate event.
443     // Guarded by linkDescs value (=locking each Link)
444     private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
445         // Note: INDIRECT -> DIRECT transition only
446         // so that BDDP discovered Link will not overwrite LDDP Link
447         if (oldLink.state() != newLink.state() ||
448             (oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
449             !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
450
451             links.put(key, newLink);
452             // strictly speaking following can be omitted
453             srcLinks.put(oldLink.src().deviceId(), key);
454             dstLinks.put(oldLink.dst().deviceId(), key);
455             return new LinkEvent(LINK_UPDATED, newLink);
456         }
457         return null;
458     }
459
460     @Override
461     public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
462         final LinkKey key = linkKey(src, dst);
463
464         DeviceId dstDeviceId = dst.deviceId();
465         Timestamp timestamp = null;
466         try {
467             timestamp = deviceClockService.getTimestamp(dstDeviceId);
468         } catch (IllegalStateException e) {
469             log.debug("Failed to remove link {}, was not the master", key);
470             // there are times when this is called before mastership
471             // handoff correctly completes.
472             return null;
473         }
474
475         LinkEvent event = removeLinkInternal(key, timestamp);
476
477         if (event != null) {
478             log.debug("Notifying peers of a link removed topology event for a link "
479                     + "between src: {} and dst: {}", src, dst);
480             notifyPeers(new InternalLinkRemovedEvent(key, timestamp));
481         }
482         return event;
483     }
484
485     private static Timestamped<LinkDescription> getPrimaryDescription(
486                 Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
487
488         synchronized (linkDescriptions) {
489             for (Entry<ProviderId, Timestamped<LinkDescription>>
490                     e : linkDescriptions.entrySet()) {
491
492                 if (!e.getKey().isAncillary()) {
493                     return e.getValue();
494                 }
495             }
496         }
497         return null;
498     }
499
500
501     // TODO: consider slicing out as Timestamp utils
502     /**
503      * Checks is timestamp is more recent than timestamped object.
504      *
505      * @param timestamp to check if this is more recent then other
506      * @param timestamped object to be tested against
507      * @return true if {@code timestamp} is more recent than {@code timestamped}
508      *         or {@code timestamped is null}
509      */
510     private static boolean isMoreRecent(Timestamp timestamp, Timestamped<?> timestamped) {
511         checkNotNull(timestamp);
512         if (timestamped == null) {
513             return true;
514         }
515         return timestamp.compareTo(timestamped.timestamp()) > 0;
516     }
517
518     private LinkEvent removeLinkInternal(LinkKey key, Timestamp timestamp) {
519         Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions
520             = getOrCreateLinkDescriptions(key);
521
522         synchronized (linkDescriptions) {
523             if (linkDescriptions.isEmpty()) {
524                 // never seen such link before. keeping timestamp for record
525                 removedLinks.put(key, timestamp);
526                 return null;
527             }
528             // accept removal request if given timestamp is newer than
529             // the latest Timestamp from Primary provider
530             Timestamped<LinkDescription> prim = getPrimaryDescription(linkDescriptions);
531             if (!isMoreRecent(timestamp, prim)) {
532                 // outdated remove request, ignore
533                 return null;
534             }
535             removedLinks.put(key, timestamp);
536             Link link = links.remove(key);
537             linkDescriptions.clear();
538             if (link != null) {
539                 srcLinks.remove(link.src().deviceId(), key);
540                 dstLinks.remove(link.dst().deviceId(), key);
541                 return new LinkEvent(LINK_REMOVED, link);
542             }
543             return null;
544         }
545     }
546
547     /**
548      * Creates concurrent readable, synchronized HashMultimap.
549      *
550      * @return SetMultimap
551      */
552     private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
553         return synchronizedSetMultimap(
554                Multimaps.newSetMultimap(new ConcurrentHashMap<K, Collection<V>>(),
555                                        () -> Sets.newConcurrentHashSet()));
556     }
557
558     /**
559      * @return primary ProviderID, or randomly chosen one if none exists
560      */
561     private static ProviderId pickBaseProviderId(
562             Map<ProviderId, Timestamped<LinkDescription>> linkDescriptions) {
563
564         ProviderId fallBackPrimary = null;
565         for (Entry<ProviderId, Timestamped<LinkDescription>> e : linkDescriptions.entrySet()) {
566             if (!e.getKey().isAncillary()) {
567                 // found primary
568                 return e.getKey();
569             } else if (fallBackPrimary == null) {
570                 // pick randomly as a fallback in case there is no primary
571                 fallBackPrimary = e.getKey();
572             }
573         }
574         return fallBackPrimary;
575     }
576
577     // Guarded by linkDescs value (=locking each Link)
578     private Link composeLink(Map<ProviderId, Timestamped<LinkDescription>> descs) {
579         ProviderId baseProviderId = pickBaseProviderId(descs);
580         Timestamped<LinkDescription> base = descs.get(baseProviderId);
581
582         ConnectPoint src = base.value().src();
583         ConnectPoint dst = base.value().dst();
584         Type type = base.value().type();
585         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
586         annotations = merge(annotations, base.value().annotations());
587
588         for (Entry<ProviderId, Timestamped<LinkDescription>> e : descs.entrySet()) {
589             if (baseProviderId.equals(e.getKey())) {
590                 continue;
591             }
592
593             // Note: In the long run we should keep track of Description timestamp
594             // and only merge conflicting keys when timestamp is newer
595             // Currently assuming there will never be a key conflict between
596             // providers
597
598             // annotation merging. not so efficient, should revisit later
599             annotations = merge(annotations, e.getValue().value().annotations());
600         }
601
602         boolean isDurable = Objects.equals(annotations.value(AnnotationKeys.DURABLE), "true");
603         return new DefaultLink(baseProviderId, src, dst, type, ACTIVE, isDurable, annotations);
604     }
605
606     private Map<ProviderId, Timestamped<LinkDescription>> getOrCreateLinkDescriptions(LinkKey key) {
607         Map<ProviderId, Timestamped<LinkDescription>> r;
608         r = linkDescs.get(key);
609         if (r != null) {
610             return r;
611         }
612         r = new HashMap<>();
613         final Map<ProviderId, Timestamped<LinkDescription>> concurrentlyAdded;
614         concurrentlyAdded = linkDescs.putIfAbsent(key, r);
615         if (concurrentlyAdded != null) {
616             return concurrentlyAdded;
617         } else {
618             return r;
619         }
620     }
621
622     private final Function<LinkKey, Link> lookupLink = new LookupLink();
623
624     /**
625      * Returns a Function to lookup Link instance using LinkKey from cache.
626      *
627      * @return lookup link function
628      */
629     private Function<LinkKey, Link> lookupLink() {
630         return lookupLink;
631     }
632
633     private final class LookupLink implements Function<LinkKey, Link> {
634         @Override
635         public Link apply(LinkKey input) {
636             if (input == null) {
637                 return null;
638             } else {
639                 return links.get(input);
640             }
641         }
642     }
643
644     private void notifyDelegateIfNotNull(LinkEvent event) {
645         if (event != null) {
646             notifyDelegate(event);
647         }
648     }
649
650     private void broadcastMessage(MessageSubject subject, Object event) {
651         clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
652     }
653
654     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
655         clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
656     }
657
658     private void notifyPeers(InternalLinkEvent event) {
659         broadcastMessage(GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
660     }
661
662     private void notifyPeers(InternalLinkRemovedEvent event) {
663         broadcastMessage(GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
664     }
665
666     // notify peer, silently ignoring error
667     private void notifyPeer(NodeId peer, InternalLinkEvent event) {
668         try {
669             unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_UPDATE, event);
670         } catch (IOException e) {
671             log.debug("Failed to notify peer {} with message {}", peer, event);
672         }
673     }
674
675     // notify peer, silently ignoring error
676     private void notifyPeer(NodeId peer, InternalLinkRemovedEvent event) {
677         try {
678             unicastMessage(peer, GossipLinkStoreMessageSubjects.LINK_REMOVED, event);
679         } catch (IOException e) {
680             log.debug("Failed to notify peer {} with message {}", peer, event);
681         }
682     }
683
684     private final class SendAdvertisementTask implements Runnable {
685
686         @Override
687         public void run() {
688             if (Thread.currentThread().isInterrupted()) {
689                 log.debug("Interrupted, quitting");
690                 return;
691             }
692
693             try {
694                 final NodeId self = clusterService.getLocalNode().id();
695                 Set<ControllerNode> nodes = clusterService.getNodes();
696
697                 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
698                         .transform(toNodeId())
699                         .toList();
700
701                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
702                     log.trace("No other peers in the cluster.");
703                     return;
704                 }
705
706                 NodeId peer;
707                 do {
708                     int idx = RandomUtils.nextInt(0, nodeIds.size());
709                     peer = nodeIds.get(idx);
710                 } while (peer.equals(self));
711
712                 LinkAntiEntropyAdvertisement ad = createAdvertisement();
713
714                 if (Thread.currentThread().isInterrupted()) {
715                     log.debug("Interrupted, quitting");
716                     return;
717                 }
718
719                 try {
720                     unicastMessage(peer, LINK_ANTI_ENTROPY_ADVERTISEMENT, ad);
721                 } catch (IOException e) {
722                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
723                     return;
724                 }
725             } catch (Exception e) {
726                 // catch all Exception to avoid Scheduled task being suppressed.
727                 log.error("Exception thrown while sending advertisement", e);
728             }
729         }
730     }
731
732     private LinkAntiEntropyAdvertisement createAdvertisement() {
733         final NodeId self = clusterService.getLocalNode().id();
734
735         Map<LinkFragmentId, Timestamp> linkTimestamps = new HashMap<>(linkDescs.size());
736         Map<LinkKey, Timestamp> linkTombstones = new HashMap<>(removedLinks.size());
737
738         linkDescs.forEach((linkKey, linkDesc) -> {
739             synchronized (linkDesc) {
740                 for (Map.Entry<ProviderId, Timestamped<LinkDescription>> e : linkDesc.entrySet()) {
741                     linkTimestamps.put(new LinkFragmentId(linkKey, e.getKey()), e.getValue().timestamp());
742                 }
743             }
744         });
745
746         linkTombstones.putAll(removedLinks);
747
748         return new LinkAntiEntropyAdvertisement(self, linkTimestamps, linkTombstones);
749     }
750
751     private void handleAntiEntropyAdvertisement(LinkAntiEntropyAdvertisement ad) {
752
753         final NodeId sender = ad.sender();
754         boolean localOutdated = false;
755
756         for (Entry<LinkKey, Map<ProviderId, Timestamped<LinkDescription>>>
757                 l : linkDescs.entrySet()) {
758
759             final LinkKey key = l.getKey();
760             final Map<ProviderId, Timestamped<LinkDescription>> link = l.getValue();
761             synchronized (link) {
762                 Timestamp localLatest = removedLinks.get(key);
763
764                 for (Entry<ProviderId, Timestamped<LinkDescription>> p : link.entrySet()) {
765                     final ProviderId providerId = p.getKey();
766                     final Timestamped<LinkDescription> pDesc = p.getValue();
767
768                     final LinkFragmentId fragId = new LinkFragmentId(key, providerId);
769                     // remote
770                     Timestamp remoteTimestamp = ad.linkTimestamps().get(fragId);
771                     if (remoteTimestamp == null) {
772                         remoteTimestamp = ad.linkTombstones().get(key);
773                     }
774                     if (remoteTimestamp == null ||
775                         pDesc.isNewerThan(remoteTimestamp)) {
776                         // I have more recent link description. update peer.
777                         notifyPeer(sender, new InternalLinkEvent(providerId, pDesc));
778                     } else {
779                         final Timestamp remoteLive = ad.linkTimestamps().get(fragId);
780                         if (remoteLive != null &&
781                             remoteLive.compareTo(pDesc.timestamp()) > 0) {
782                             // I have something outdated
783                             localOutdated = true;
784                         }
785                     }
786
787                     // search local latest along the way
788                     if (localLatest == null ||
789                         pDesc.isNewerThan(localLatest)) {
790                         localLatest = pDesc.timestamp();
791                     }
792                 }
793                 // Tests if remote remove is more recent then local latest.
794                 final Timestamp remoteRemove = ad.linkTombstones().get(key);
795                 if (remoteRemove != null) {
796                     if (localLatest != null &&
797                         localLatest.compareTo(remoteRemove) < 0) {
798                         // remote remove is more recent
799                         notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
800                     }
801                 }
802             }
803         }
804
805         // populate remove info if not known locally
806         for (Entry<LinkKey, Timestamp> remoteRm : ad.linkTombstones().entrySet()) {
807             final LinkKey key = remoteRm.getKey();
808             final Timestamp remoteRemove = remoteRm.getValue();
809             // relying on removeLinkInternal to ignore stale info
810             notifyDelegateIfNotNull(removeLinkInternal(key, remoteRemove));
811         }
812
813         if (localOutdated) {
814             // send back advertisement to speed up convergence
815             try {
816                 unicastMessage(sender, LINK_ANTI_ENTROPY_ADVERTISEMENT,
817                                 createAdvertisement());
818             } catch (IOException e) {
819                 log.debug("Failed to send back active advertisement");
820             }
821         }
822     }
823
824     private final class InternalLinkEventListener
825             implements ClusterMessageHandler {
826         @Override
827         public void handle(ClusterMessage message) {
828
829             log.trace("Received link event from peer: {}", message.sender());
830             InternalLinkEvent event = (InternalLinkEvent) SERIALIZER.decode(message.payload());
831
832             ProviderId providerId = event.providerId();
833             Timestamped<LinkDescription> linkDescription = event.linkDescription();
834
835             try {
836                 notifyDelegateIfNotNull(createOrUpdateLinkInternal(providerId, linkDescription));
837             } catch (Exception e) {
838                 log.warn("Exception thrown handling link event", e);
839             }
840         }
841     }
842
843     private final class InternalLinkRemovedEventListener
844             implements ClusterMessageHandler {
845         @Override
846         public void handle(ClusterMessage message) {
847
848             log.trace("Received link removed event from peer: {}", message.sender());
849             InternalLinkRemovedEvent event = (InternalLinkRemovedEvent) SERIALIZER.decode(message.payload());
850
851             LinkKey linkKey = event.linkKey();
852             Timestamp timestamp = event.timestamp();
853
854             try {
855                 notifyDelegateIfNotNull(removeLinkInternal(linkKey, timestamp));
856             } catch (Exception e) {
857                 log.warn("Exception thrown handling link removed", e);
858             }
859         }
860     }
861
862     private final class InternalLinkAntiEntropyAdvertisementListener
863             implements ClusterMessageHandler {
864
865         @Override
866         public void handle(ClusterMessage message) {
867             log.trace("Received Link Anti-Entropy advertisement from peer: {}", message.sender());
868             LinkAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
869             try {
870                 handleAntiEntropyAdvertisement(advertisement);
871             } catch (Exception e) {
872                 log.warn("Exception thrown while handling Link advertisements", e);
873                 throw e;
874             }
875         }
876     }
877
878     private final class LinkInjectedEventListener
879             implements ClusterMessageHandler {
880         @Override
881         public void handle(ClusterMessage message) {
882
883             log.trace("Received injected link event from peer: {}", message.sender());
884             LinkInjectedEvent linkInjectedEvent = SERIALIZER.decode(message.payload());
885
886             ProviderId providerId = linkInjectedEvent.providerId();
887             LinkDescription linkDescription = linkInjectedEvent.linkDescription();
888
889             final DeviceId deviceId = linkDescription.dst().deviceId();
890             if (!deviceClockService.isTimestampAvailable(deviceId)) {
891                 // workaround for ONOS-1208
892                 log.warn("Not ready to accept update. Dropping {}", linkDescription);
893                 return;
894             }
895
896             try {
897                 createOrUpdateLink(providerId, linkDescription);
898             } catch (Exception e) {
899                 log.warn("Exception thrown while handling link injected event", e);
900             }
901         }
902     }
903 }