973db494b2376f59de18ff24342bfe6fa3341c8e
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.device.impl;
17
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
22
23 import org.apache.commons.lang3.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onlab.packet.ChassisId;
31 import org.onlab.util.KryoNamespace;
32 import org.onlab.util.NewConcurrentHashMap;
33 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.mastership.MastershipTerm;
38 import org.onosproject.mastership.MastershipTermService;
39 import org.onosproject.net.Annotations;
40 import org.onosproject.net.AnnotationsUtil;
41 import org.onosproject.net.DefaultAnnotations;
42 import org.onosproject.net.DefaultDevice;
43 import org.onosproject.net.DefaultPort;
44 import org.onosproject.net.Device;
45 import org.onosproject.net.Device.Type;
46 import org.onosproject.net.DeviceId;
47 import org.onosproject.net.MastershipRole;
48 import org.onosproject.net.OchPort;
49 import org.onosproject.net.OduCltPort;
50 import org.onosproject.net.OmsPort;
51 import org.onosproject.net.Port;
52 import org.onosproject.net.PortNumber;
53 import org.onosproject.net.device.DefaultPortStatistics;
54 import org.onosproject.net.device.DeviceClockService;
55 import org.onosproject.net.device.DeviceDescription;
56 import org.onosproject.net.device.DeviceEvent;
57 import org.onosproject.net.device.DeviceStore;
58 import org.onosproject.net.device.DeviceStoreDelegate;
59 import org.onosproject.net.device.OchPortDescription;
60 import org.onosproject.net.device.OduCltPortDescription;
61 import org.onosproject.net.device.OmsPortDescription;
62 import org.onosproject.net.device.PortDescription;
63 import org.onosproject.net.device.PortStatistics;
64 import org.onosproject.net.provider.ProviderId;
65 import org.onosproject.store.AbstractStore;
66 import org.onosproject.store.Timestamp;
67 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
68 import org.onosproject.store.cluster.messaging.ClusterMessage;
69 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
70 import org.onosproject.store.cluster.messaging.MessageSubject;
71 import org.onosproject.store.impl.Timestamped;
72 import org.onosproject.store.serializers.KryoNamespaces;
73 import org.onosproject.store.serializers.KryoSerializer;
74 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 import org.onosproject.store.service.EventuallyConsistentMap;
76 import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 import org.onosproject.store.service.EventuallyConsistentMapListener;
78 import org.onosproject.store.service.MultiValuedTimestamp;
79 import org.onosproject.store.service.StorageService;
80 import org.onosproject.store.service.WallClockTimestamp;
81 import org.slf4j.Logger;
82
83 import java.io.IOException;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.List;
91 import java.util.Map;
92 import java.util.Map.Entry;
93 import java.util.Objects;
94 import java.util.Set;
95 import java.util.concurrent.ConcurrentMap;
96 import java.util.concurrent.ExecutorService;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledExecutorService;
99 import java.util.concurrent.TimeUnit;
100
101 import static com.google.common.base.Preconditions.checkArgument;
102 import static com.google.common.base.Predicates.notNull;
103 import static com.google.common.base.Verify.verify;
104 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
105 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
106 import static org.onlab.util.Tools.groupedThreads;
107 import static org.onlab.util.Tools.minPriority;
108 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
109 import static org.onosproject.net.DefaultAnnotations.merge;
110 import static org.onosproject.net.device.DeviceEvent.Type.*;
111 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
112 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
113 import static org.slf4j.LoggerFactory.getLogger;
114
115 /**
116  * Manages inventory of infrastructure devices using gossip protocol to distribute
117  * information.
118  */
119 @Component(immediate = true)
120 @Service
121 public class GossipDeviceStore
122         extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
123         implements DeviceStore {
124
125     private final Logger log = getLogger(getClass());
126
127     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
128     // Timeout in milliseconds to process device or ports on remote master node
129     private static final int REMOTE_MASTER_TIMEOUT = 1000;
130
131     // innerMap is used to lock a Device, thus instance should never be replaced.
132     // collection of Description given from various providers
133     private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
134             deviceDescs = Maps.newConcurrentMap();
135
136     // cache of Device and Ports generated by compositing descriptions from providers
137     private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
138     private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
139
140     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
141     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
142     private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
143             portStatsListener = new InternalPortStatsListener();
144
145     // to be updated under Device lock
146     private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
147     private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
148
149     // available(=UP) devices
150     private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
151
152     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
153     protected DeviceClockService deviceClockService;
154
155     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
156     protected StorageService storageService;
157
158     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
159     protected ClusterCommunicationService clusterCommunicator;
160
161     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
162     protected ClusterService clusterService;
163
164     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
165     protected MastershipService mastershipService;
166
167     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168     protected MastershipTermService termService;
169
170
171     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
172         @Override
173         protected void setupKryoPool() {
174             serializerPool = KryoNamespace.newBuilder()
175                     .register(DistributedStoreSerializers.STORE_COMMON)
176                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
177                     .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
178                     .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
179                     .register(InternalDeviceRemovedEvent.class)
180                     .register(new InternalPortEventSerializer(), InternalPortEvent.class)
181                     .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
182                     .register(DeviceAntiEntropyAdvertisement.class)
183                     .register(DeviceFragmentId.class)
184                     .register(PortFragmentId.class)
185                     .register(DeviceInjectedEvent.class)
186                     .register(PortInjectedEvent.class)
187                     .build();
188         }
189     };
190
191     private ExecutorService executor;
192
193     private ScheduledExecutorService backgroundExecutor;
194
195     // TODO make these anti-entropy parameters configurable
196     private long initialDelaySec = 5;
197     private long periodSec = 5;
198
199     @Activate
200     public void activate() {
201         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
202
203         backgroundExecutor =
204                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
205
206         clusterCommunicator.addSubscriber(
207                 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
208         clusterCommunicator.addSubscriber(
209                 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
210                 new InternalDeviceOfflineEventListener(),
211                 executor);
212         clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
213                                           new InternalRemoveRequestListener(),
214                                           executor);
215         clusterCommunicator.addSubscriber(
216                 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
217         clusterCommunicator.addSubscriber(
218                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
219         clusterCommunicator.addSubscriber(
220                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
221         clusterCommunicator.addSubscriber(
222                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
223                 new InternalDeviceAdvertisementListener(),
224                 backgroundExecutor);
225         clusterCommunicator.addSubscriber(
226                 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
227         clusterCommunicator.addSubscriber(
228                 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
229
230         // start anti-entropy thread
231         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
232                                                initialDelaySec, periodSec, TimeUnit.SECONDS);
233
234         // Create a distributed map for port stats.
235         KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
236                 .register(KryoNamespaces.API)
237                 .register(DefaultPortStatistics.class)
238                 .register(DeviceId.class)
239                 .register(MultiValuedTimestamp.class)
240                 .register(WallClockTimestamp.class);
241
242         devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
243                 .withName("port-stats")
244                 .withSerializer(deviceDataSerializer)
245                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247                 .withTombstonesDisabled()
248                 .build();
249         devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
250                 eventuallyConsistentMapBuilder()
251                 .withName("port-stats-delta")
252                 .withSerializer(deviceDataSerializer)
253                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
254                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
255                 .withTombstonesDisabled()
256                 .build();
257         devicePortStats.addListener(portStatsListener);
258         log.info("Started");
259     }
260
261     @Deactivate
262     public void deactivate() {
263         devicePortStats.destroy();
264         devicePortDeltaStats.destroy();
265         executor.shutdownNow();
266
267         backgroundExecutor.shutdownNow();
268         try {
269             if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
270                 log.error("Timeout during executor shutdown");
271             }
272         } catch (InterruptedException e) {
273             log.error("Error during executor shutdown", e);
274         }
275
276         deviceDescs.clear();
277         devices.clear();
278         devicePorts.clear();
279         availableDevices.clear();
280         log.info("Stopped");
281     }
282
283     @Override
284     public int getDeviceCount() {
285         return devices.size();
286     }
287
288     @Override
289     public Iterable<Device> getDevices() {
290         return Collections.unmodifiableCollection(devices.values());
291     }
292
293     @Override
294     public Iterable<Device> getAvailableDevices() {
295         return FluentIterable.from(getDevices())
296                 .filter(input -> isAvailable(input.id()));
297     }
298
299     @Override
300     public Device getDevice(DeviceId deviceId) {
301         return devices.get(deviceId);
302     }
303
304     @Override
305     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
306                                                          DeviceId deviceId,
307                                                          DeviceDescription deviceDescription) {
308         NodeId localNode = clusterService.getLocalNode().id();
309         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
310
311         // Process device update only if we're the master,
312         // otherwise signal the actual master.
313         DeviceEvent deviceEvent = null;
314         if (localNode.equals(deviceNode)) {
315
316             final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
317             final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
318             final Timestamped<DeviceDescription> mergedDesc;
319             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
320
321             synchronized (device) {
322                 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
323                 mergedDesc = device.get(providerId).getDeviceDesc();
324             }
325
326             if (deviceEvent != null) {
327                 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
328                          providerId, deviceId);
329                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
330             }
331
332         } else {
333             // FIXME Temporary hack for NPE (ONOS-1171).
334             // Proper fix is to implement forwarding to master on ConfigProvider
335             // redo ONOS-490
336             if (deviceNode == null) {
337                 // silently ignore
338                 return null;
339             }
340
341
342             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
343                     providerId, deviceId, deviceDescription);
344
345             // TODO check unicast return value
346             clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
347             /* error log:
348             log.warn("Failed to process injected device id: {} desc: {} " +
349                             "(cluster messaging failed: {})",
350                     deviceId, deviceDescription, e);
351             */
352         }
353
354         return deviceEvent;
355     }
356
357     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
358                                                      DeviceId deviceId,
359                                                      Timestamped<DeviceDescription> deltaDesc) {
360
361         // Collection of DeviceDescriptions for a Device
362         Map<ProviderId, DeviceDescriptions> device
363                 = getOrCreateDeviceDescriptionsMap(deviceId);
364
365         synchronized (device) {
366             // locking per device
367
368             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
369                 log.debug("Ignoring outdated event: {}", deltaDesc);
370                 return null;
371             }
372
373             DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
374
375             final Device oldDevice = devices.get(deviceId);
376             final Device newDevice;
377
378             if (deltaDesc == descs.getDeviceDesc() ||
379                     deltaDesc.isNewer(descs.getDeviceDesc())) {
380                 // on new device or valid update
381                 descs.putDeviceDesc(deltaDesc);
382                 newDevice = composeDevice(deviceId, device);
383             } else {
384                 // outdated event, ignored.
385                 return null;
386             }
387             if (oldDevice == null) {
388                 // ADD
389                 return createDevice(providerId, newDevice, deltaDesc.timestamp());
390             } else {
391                 // UPDATE or ignore (no change or stale)
392                 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
393             }
394         }
395     }
396
397     // Creates the device and returns the appropriate event if necessary.
398     // Guarded by deviceDescs value (=Device lock)
399     private DeviceEvent createDevice(ProviderId providerId,
400                                      Device newDevice, Timestamp timestamp) {
401
402         // update composed device cache
403         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
404         verify(oldDevice == null,
405                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
406                providerId, oldDevice, newDevice);
407
408         if (!providerId.isAncillary()) {
409             markOnline(newDevice.id(), timestamp);
410         }
411
412         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
413     }
414
415     // Updates the device and returns the appropriate event if necessary.
416     // Guarded by deviceDescs value (=Device lock)
417     private DeviceEvent updateDevice(ProviderId providerId,
418                                      Device oldDevice,
419                                      Device newDevice, Timestamp newTimestamp) {
420         // We allow only certain attributes to trigger update
421         boolean propertiesChanged =
422                 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
423                         !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
424                         !Objects.equals(oldDevice.providerId(), newDevice.providerId());
425         boolean annotationsChanged =
426                 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
427
428         // Primary providers can respond to all changes, but ancillary ones
429         // should respond only to annotation changes.
430         if ((providerId.isAncillary() && annotationsChanged) ||
431                 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
432             boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
433             if (!replaced) {
434                 verify(replaced,
435                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
436                        providerId, oldDevice, devices.get(newDevice.id())
437                         , newDevice);
438             }
439             if (!providerId.isAncillary()) {
440                 boolean wasOnline = availableDevices.contains(newDevice.id());
441                 markOnline(newDevice.id(), newTimestamp);
442                 if (!wasOnline) {
443                     notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
444                 }
445             }
446
447             return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
448         }
449         return null;
450     }
451
452     @Override
453     public DeviceEvent markOffline(DeviceId deviceId) {
454         final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
455         final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
456         if (event != null) {
457             log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
458                      deviceId, timestamp);
459             notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
460         }
461         return event;
462     }
463
464     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
465
466         Map<ProviderId, DeviceDescriptions> providerDescs
467                 = getOrCreateDeviceDescriptionsMap(deviceId);
468
469         // locking device
470         synchronized (providerDescs) {
471
472             // accept off-line if given timestamp is newer than
473             // the latest Timestamp from Primary provider
474             DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
475             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
476             if (timestamp.compareTo(lastTimestamp) <= 0) {
477                 // outdated event ignore
478                 return null;
479             }
480
481             offline.put(deviceId, timestamp);
482
483             Device device = devices.get(deviceId);
484             if (device == null) {
485                 return null;
486             }
487             boolean removed = availableDevices.remove(deviceId);
488             if (removed) {
489                 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
490             }
491             return null;
492         }
493     }
494
495     /**
496      * Marks the device as available if the given timestamp is not outdated,
497      * compared to the time the device has been marked offline.
498      *
499      * @param deviceId  identifier of the device
500      * @param timestamp of the event triggering this change.
501      * @return true if availability change request was accepted and changed the state
502      */
503     // Guarded by deviceDescs value (=Device lock)
504     private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
505         // accept on-line if given timestamp is newer than
506         // the latest offline request Timestamp
507         Timestamp offlineTimestamp = offline.get(deviceId);
508         if (offlineTimestamp == null ||
509                 offlineTimestamp.compareTo(timestamp) < 0) {
510
511             offline.remove(deviceId);
512             return availableDevices.add(deviceId);
513         }
514         return false;
515     }
516
517     @Override
518     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
519                                                       DeviceId deviceId,
520                                                       List<PortDescription> portDescriptions) {
521
522         NodeId localNode = clusterService.getLocalNode().id();
523         // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
524         // since it will trigger distributed store read.
525         // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
526         // outside Device subsystem. so that we don't have to modify both Device and Link stores.
527         // If we don't care much about topology performance, then it might be OK.
528         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
529
530         // Process port update only if we're the master of the device,
531         // otherwise signal the actual master.
532         List<DeviceEvent> deviceEvents = null;
533         if (localNode.equals(deviceNode)) {
534
535             final Timestamp newTimestamp;
536             try {
537                 newTimestamp = deviceClockService.getTimestamp(deviceId);
538             } catch (IllegalStateException e) {
539                 log.info("Timestamp was not available for device {}", deviceId);
540                 log.debug("  discarding {}", portDescriptions);
541                 // Failed to generate timestamp.
542
543                 // Possible situation:
544                 //  Device connected and became master for short period of time,
545                 // but lost mastership before this instance had the chance to
546                 // retrieve term information.
547
548                 // Information dropped here is expected to be recoverable by
549                 // device probing after mastership change
550
551                 return Collections.emptyList();
552             }
553             log.debug("timestamp for {} {}", deviceId, newTimestamp);
554
555             final Timestamped<List<PortDescription>> timestampedInput
556                     = new Timestamped<>(portDescriptions, newTimestamp);
557             final Timestamped<List<PortDescription>> merged;
558
559             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
560
561             synchronized (device) {
562                 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
563                 final DeviceDescriptions descs = device.get(providerId);
564                 List<PortDescription> mergedList =
565                         FluentIterable.from(portDescriptions)
566                                 .transform(input ->
567                                     // lookup merged port description
568                                     descs.getPortDesc(input.portNumber()).value()
569                                 ).toList();
570                 merged = new Timestamped<>(mergedList, newTimestamp);
571             }
572
573             if (!deviceEvents.isEmpty()) {
574                 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
575                          providerId, deviceId);
576                 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
577             }
578
579         } else {
580             // FIXME Temporary hack for NPE (ONOS-1171).
581             // Proper fix is to implement forwarding to master on ConfigProvider
582             // redo ONOS-490
583             if (deviceNode == null) {
584                 // silently ignore
585                 return Collections.emptyList();
586             }
587
588             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
589
590             //TODO check unicast return value
591             clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
592             /* error log:
593             log.warn("Failed to process injected ports of device id: {} " +
594                             "(cluster messaging failed: {})",
595                     deviceId, e);
596             */
597         }
598
599         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
600     }
601
602     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
603                                                   DeviceId deviceId,
604                                                   Timestamped<List<PortDescription>> portDescriptions) {
605
606         Device device = devices.get(deviceId);
607         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
608
609         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
610         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
611
612         List<DeviceEvent> events = new ArrayList<>();
613         synchronized (descsMap) {
614
615             if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
616                 log.debug("Ignoring outdated events: {}", portDescriptions);
617                 return Collections.emptyList();
618             }
619
620             DeviceDescriptions descs = descsMap.get(providerId);
621             // every provider must provide DeviceDescription.
622             checkArgument(descs != null,
623                           "Device description for Device ID %s from Provider %s was not found",
624                           deviceId, providerId);
625
626             Map<PortNumber, Port> ports = getPortMap(deviceId);
627
628             final Timestamp newTimestamp = portDescriptions.timestamp();
629
630             // Add new ports
631             Set<PortNumber> processed = new HashSet<>();
632             for (PortDescription portDescription : portDescriptions.value()) {
633                 final PortNumber number = portDescription.portNumber();
634                 processed.add(number);
635
636                 final Port oldPort = ports.get(number);
637                 final Port newPort;
638
639
640                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
641                 if (existingPortDesc == null ||
642                         newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
643                     // on new port or valid update
644                     // update description
645                     descs.putPortDesc(new Timestamped<>(portDescription,
646                                                         portDescriptions.timestamp()));
647                     newPort = composePort(device, number, descsMap);
648                 } else {
649                     // outdated event, ignored.
650                     continue;
651                 }
652
653                 events.add(oldPort == null ?
654                                    createPort(device, newPort, ports) :
655                                    updatePort(device, oldPort, newPort, ports));
656             }
657
658             events.addAll(pruneOldPorts(device, ports, processed));
659         }
660         return FluentIterable.from(events).filter(notNull()).toList();
661     }
662
663     // Creates a new port based on the port description adds it to the map and
664     // Returns corresponding event.
665     // Guarded by deviceDescs value (=Device lock)
666     private DeviceEvent createPort(Device device, Port newPort,
667                                    Map<PortNumber, Port> ports) {
668         ports.put(newPort.number(), newPort);
669         return new DeviceEvent(PORT_ADDED, device, newPort);
670     }
671
672     // Checks if the specified port requires update and if so, it replaces the
673     // existing entry in the map and returns corresponding event.
674     // Guarded by deviceDescs value (=Device lock)
675     private DeviceEvent updatePort(Device device, Port oldPort,
676                                    Port newPort,
677                                    Map<PortNumber, Port> ports) {
678         if (oldPort.isEnabled() != newPort.isEnabled() ||
679                 oldPort.type() != newPort.type() ||
680                 oldPort.portSpeed() != newPort.portSpeed() ||
681                 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
682             ports.put(oldPort.number(), newPort);
683             return new DeviceEvent(PORT_UPDATED, device, newPort);
684         }
685         return null;
686     }
687
688     // Prunes the specified list of ports based on which ports are in the
689     // processed list and returns list of corresponding events.
690     // Guarded by deviceDescs value (=Device lock)
691     private List<DeviceEvent> pruneOldPorts(Device device,
692                                             Map<PortNumber, Port> ports,
693                                             Set<PortNumber> processed) {
694         List<DeviceEvent> events = new ArrayList<>();
695         Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
696         while (iterator.hasNext()) {
697             Entry<PortNumber, Port> e = iterator.next();
698             PortNumber portNumber = e.getKey();
699             if (!processed.contains(portNumber)) {
700                 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
701                 iterator.remove();
702             }
703         }
704         return events;
705     }
706
707     // Gets the map of ports for the specified device; if one does not already
708     // exist, it creates and registers a new one.
709     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
710         return createIfAbsentUnchecked(devicePorts, deviceId,
711                                        NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
712     }
713
714     private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
715             DeviceId deviceId) {
716         Map<ProviderId, DeviceDescriptions> r;
717         r = deviceDescs.get(deviceId);
718         if (r == null) {
719             r = new HashMap<>();
720             final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
721             concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
722             if (concurrentlyAdded != null) {
723                 r = concurrentlyAdded;
724             }
725         }
726         return r;
727     }
728
729     // Guarded by deviceDescs value (=Device lock)
730     private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
731             Map<ProviderId, DeviceDescriptions> device,
732             ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
733         synchronized (device) {
734             DeviceDescriptions r = device.get(providerId);
735             if (r == null) {
736                 r = new DeviceDescriptions(deltaDesc);
737                 device.put(providerId, r);
738             }
739             return r;
740         }
741     }
742
743     @Override
744     public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
745                                                      DeviceId deviceId,
746                                                      PortDescription portDescription) {
747         final Timestamp newTimestamp;
748         try {
749             newTimestamp = deviceClockService.getTimestamp(deviceId);
750         } catch (IllegalStateException e) {
751             log.info("Timestamp was not available for device {}", deviceId);
752             log.debug("  discarding {}", portDescription);
753             // Failed to generate timestamp. Ignoring.
754             // See updatePorts comment
755             return null;
756         }
757         final Timestamped<PortDescription> deltaDesc
758                 = new Timestamped<>(portDescription, newTimestamp);
759         final DeviceEvent event;
760         final Timestamped<PortDescription> mergedDesc;
761         final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
762         synchronized (device) {
763             event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
764             mergedDesc = device.get(providerId)
765                     .getPortDesc(portDescription.portNumber());
766         }
767         if (event != null) {
768             log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
769                      providerId, deviceId);
770             notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
771         }
772         return event;
773     }
774
775     private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
776                                                  Timestamped<PortDescription> deltaDesc) {
777         Device device = devices.get(deviceId);
778         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
779
780         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
781         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
782
783         synchronized (descsMap) {
784
785             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
786                 log.debug("Ignoring outdated event: {}", deltaDesc);
787                 return null;
788             }
789
790             DeviceDescriptions descs = descsMap.get(providerId);
791             // assuming all providers must to give DeviceDescription
792             verify(descs != null,
793                    "Device description for Device ID %s from Provider %s was not found",
794                    deviceId, providerId);
795
796             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
797             final PortNumber number = deltaDesc.value().portNumber();
798             final Port oldPort = ports.get(number);
799             final Port newPort;
800
801             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
802             if (existingPortDesc == null ||
803                     deltaDesc.isNewer(existingPortDesc)) {
804                 // on new port or valid update
805                 // update description
806                 descs.putPortDesc(deltaDesc);
807                 newPort = composePort(device, number, descsMap);
808             } else {
809                 // same or outdated event, ignored.
810                 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
811                 return null;
812             }
813
814             if (oldPort == null) {
815                 return createPort(device, newPort, ports);
816             } else {
817                 return updatePort(device, oldPort, newPort, ports);
818             }
819         }
820     }
821
822     @Override
823     public List<Port> getPorts(DeviceId deviceId) {
824         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
825         if (ports == null) {
826             return Collections.emptyList();
827         }
828         return ImmutableList.copyOf(ports.values());
829     }
830
831     @Override
832     public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
833                                             Collection<PortStatistics> newStatsCollection) {
834
835         Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
836         Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
837         Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
838
839         if (prvStatsMap != null) {
840             for (PortStatistics newStats : newStatsCollection) {
841                 PortNumber port = PortNumber.portNumber(newStats.port());
842                 PortStatistics prvStats = prvStatsMap.get(port);
843                 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
844                 PortStatistics deltaStats = builder.build();
845                 if (prvStats != null) {
846                     deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
847                 }
848                 deltaStatsMap.put(port, deltaStats);
849                 newStatsMap.put(port, newStats);
850             }
851         } else {
852             for (PortStatistics newStats : newStatsCollection) {
853                 PortNumber port = PortNumber.portNumber(newStats.port());
854                 newStatsMap.put(port, newStats);
855             }
856         }
857         devicePortDeltaStats.put(deviceId, deltaStatsMap);
858         devicePortStats.put(deviceId, newStatsMap);
859         // DeviceEvent returns null because of InternalPortStatsListener usage
860         return null;
861     }
862
863     /**
864      * Calculate delta statistics by subtracting previous from new statistics.
865      *
866      * @param deviceId device identifier
867      * @param prvStats previous port statistics
868      * @param newStats new port statistics
869      * @return PortStatistics
870      */
871     public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
872         // calculate time difference
873         long deltaStatsSec, deltaStatsNano;
874         if (newStats.durationNano() < prvStats.durationNano()) {
875             deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
876             deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
877         } else {
878             deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
879             deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
880         }
881         DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
882         DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
883                 .setPort(newStats.port())
884                 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
885                 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
886                 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
887                 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
888                 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
889                 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
890                 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
891                 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
892                 .setDurationSec(deltaStatsSec)
893                 .setDurationNano(deltaStatsNano)
894                 .build();
895         return deltaStats;
896     }
897
898     @Override
899     public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
900         Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
901         if (portStats == null) {
902             return Collections.emptyList();
903         }
904         return ImmutableList.copyOf(portStats.values());
905     }
906
907     @Override
908     public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
909         Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
910         if (portStats == null) {
911             return Collections.emptyList();
912         }
913         return ImmutableList.copyOf(portStats.values());
914     }
915
916     @Override
917     public Port getPort(DeviceId deviceId, PortNumber portNumber) {
918         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
919         return ports == null ? null : ports.get(portNumber);
920     }
921
922     @Override
923     public boolean isAvailable(DeviceId deviceId) {
924         return availableDevices.contains(deviceId);
925     }
926
927     @Override
928     public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
929         final NodeId myId = clusterService.getLocalNode().id();
930         NodeId master = mastershipService.getMasterFor(deviceId);
931
932         // if there exist a master, forward
933         // if there is no master, try to become one and process
934
935         boolean relinquishAtEnd = false;
936         if (master == null) {
937             final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
938             if (myRole != MastershipRole.NONE) {
939                 relinquishAtEnd = true;
940             }
941             log.debug("Temporarily requesting role for {} to remove", deviceId);
942             mastershipService.requestRoleFor(deviceId);
943             MastershipTerm term = termService.getMastershipTerm(deviceId);
944             if (term != null && myId.equals(term.master())) {
945                 master = myId;
946             }
947         }
948
949         if (!myId.equals(master)) {
950             log.debug("{} has control of {}, forwarding remove request",
951                       master, deviceId);
952
953             // TODO check unicast return value
954             clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
955              /* error log:
956              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
957              */
958
959             // event will be triggered after master processes it.
960             return null;
961         }
962
963         // I have control..
964
965         Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
966         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
967         if (event != null) {
968             log.debug("Notifying peers of a device removed topology event for deviceId: {}",
969                       deviceId);
970             notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
971         }
972         if (relinquishAtEnd) {
973             log.debug("Relinquishing temporary role acquired for {}", deviceId);
974             mastershipService.relinquishMastership(deviceId);
975         }
976         return event;
977     }
978
979     private DeviceEvent removeDeviceInternal(DeviceId deviceId,
980                                              Timestamp timestamp) {
981
982         Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
983         synchronized (descs) {
984             // accept removal request if given timestamp is newer than
985             // the latest Timestamp from Primary provider
986             DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
987             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
988             if (timestamp.compareTo(lastTimestamp) <= 0) {
989                 // outdated event ignore
990                 return null;
991             }
992             removalRequest.put(deviceId, timestamp);
993
994             Device device = devices.remove(deviceId);
995             // should DEVICE_REMOVED carry removed ports?
996             Map<PortNumber, Port> ports = devicePorts.get(deviceId);
997             if (ports != null) {
998                 ports.clear();
999             }
1000             markOfflineInternal(deviceId, timestamp);
1001             descs.clear();
1002             return device == null ? null :
1003                     new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1004         }
1005     }
1006
1007     /**
1008      * Checks if given timestamp is superseded by removal request
1009      * with more recent timestamp.
1010      *
1011      * @param deviceId         identifier of a device
1012      * @param timestampToCheck timestamp of an event to check
1013      * @return true if device is already removed
1014      */
1015     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1016         Timestamp removalTimestamp = removalRequest.get(deviceId);
1017         if (removalTimestamp != null &&
1018                 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1019             // removalRequest is more recent
1020             return true;
1021         }
1022         return false;
1023     }
1024
1025     /**
1026      * Returns a Device, merging description given from multiple Providers.
1027      *
1028      * @param deviceId      device identifier
1029      * @param providerDescs Collection of Descriptions from multiple providers
1030      * @return Device instance
1031      */
1032     private Device composeDevice(DeviceId deviceId,
1033                                  Map<ProviderId, DeviceDescriptions> providerDescs) {
1034
1035         checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1036
1037         ProviderId primary = pickPrimaryPID(providerDescs);
1038
1039         DeviceDescriptions desc = providerDescs.get(primary);
1040
1041         final DeviceDescription base = desc.getDeviceDesc().value();
1042         Type type = base.type();
1043         String manufacturer = base.manufacturer();
1044         String hwVersion = base.hwVersion();
1045         String swVersion = base.swVersion();
1046         String serialNumber = base.serialNumber();
1047         ChassisId chassisId = base.chassisId();
1048         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1049         annotations = merge(annotations, base.annotations());
1050
1051         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1052             if (e.getKey().equals(primary)) {
1053                 continue;
1054             }
1055             // Note: should keep track of Description timestamp in the future
1056             // and only merge conflicting keys when timestamp is newer.
1057             // Currently assuming there will never be a key conflict between
1058             // providers
1059
1060             // annotation merging. not so efficient, should revisit later
1061             annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1062         }
1063
1064         return new DefaultDevice(primary, deviceId, type, manufacturer,
1065                                  hwVersion, swVersion, serialNumber,
1066                                  chassisId, annotations);
1067     }
1068
1069     private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1070                                  PortDescription description, Annotations annotations) {
1071         switch (description.type()) {
1072             case OMS:
1073                 OmsPortDescription omsDesc = (OmsPortDescription) description;
1074                 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1075                         omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1076             case OCH:
1077                 OchPortDescription ochDesc = (OchPortDescription) description;
1078                 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1079                         ochDesc.isTunable(), ochDesc.lambda(), annotations);
1080             case ODUCLT:
1081                 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1082                 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1083             default:
1084                 return new DefaultPort(device, number, isEnabled, description.type(),
1085                         description.portSpeed(), annotations);
1086         }
1087     }
1088
1089     /**
1090      * Returns a Port, merging description given from multiple Providers.
1091      *
1092      * @param device   device the port is on
1093      * @param number   port number
1094      * @param descsMap Collection of Descriptions from multiple providers
1095      * @return Port instance
1096      */
1097     private Port composePort(Device device, PortNumber number,
1098                              Map<ProviderId, DeviceDescriptions> descsMap) {
1099
1100         ProviderId primary = pickPrimaryPID(descsMap);
1101         DeviceDescriptions primDescs = descsMap.get(primary);
1102         // if no primary, assume not enabled
1103         boolean isEnabled = false;
1104         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1105         Timestamp newest = null;
1106         final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1107         if (portDesc != null) {
1108             isEnabled = portDesc.value().isEnabled();
1109             annotations = merge(annotations, portDesc.value().annotations());
1110             newest = portDesc.timestamp();
1111         }
1112         Port updated = null;
1113         for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1114             if (e.getKey().equals(primary)) {
1115                 continue;
1116             }
1117             // Note: should keep track of Description timestamp in the future
1118             // and only merge conflicting keys when timestamp is newer.
1119             // Currently assuming there will never be a key conflict between
1120             // providers
1121
1122             // annotation merging. not so efficient, should revisit later
1123             final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1124             if (otherPortDesc != null) {
1125                 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1126                     continue;
1127                 }
1128                 annotations = merge(annotations, otherPortDesc.value().annotations());
1129                 PortDescription other = otherPortDesc.value();
1130                 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1131                 newest = otherPortDesc.timestamp();
1132             }
1133         }
1134         if (portDesc == null) {
1135             return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1136         }
1137         PortDescription current = portDesc.value();
1138         return updated == null
1139                 ? buildTypedPort(device, number, isEnabled, current, annotations)
1140                 : updated;
1141     }
1142
1143     /**
1144      * @return primary ProviderID, or randomly chosen one if none exists
1145      */
1146     private ProviderId pickPrimaryPID(
1147             Map<ProviderId, DeviceDescriptions> providerDescs) {
1148         ProviderId fallBackPrimary = null;
1149         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1150             if (!e.getKey().isAncillary()) {
1151                 return e.getKey();
1152             } else if (fallBackPrimary == null) {
1153                 // pick randomly as a fallback in case there is no primary
1154                 fallBackPrimary = e.getKey();
1155             }
1156         }
1157         return fallBackPrimary;
1158     }
1159
1160     private DeviceDescriptions getPrimaryDescriptions(
1161             Map<ProviderId, DeviceDescriptions> providerDescs) {
1162         ProviderId pid = pickPrimaryPID(providerDescs);
1163         return providerDescs.get(pid);
1164     }
1165
1166     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1167         clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1168     }
1169
1170     private void broadcastMessage(MessageSubject subject, Object event) {
1171         clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1172     }
1173
1174     private void notifyPeers(InternalDeviceEvent event) {
1175         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1176     }
1177
1178     private void notifyPeers(InternalDeviceOfflineEvent event) {
1179         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1180     }
1181
1182     private void notifyPeers(InternalDeviceRemovedEvent event) {
1183         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1184     }
1185
1186     private void notifyPeers(InternalPortEvent event) {
1187         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1188     }
1189
1190     private void notifyPeers(InternalPortStatusEvent event) {
1191         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1192     }
1193
1194     private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1195         try {
1196             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1197         } catch (IOException e) {
1198             log.error("Failed to send" + event + " to " + recipient, e);
1199         }
1200     }
1201
1202     private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1203         try {
1204             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1205         } catch (IOException e) {
1206             log.error("Failed to send" + event + " to " + recipient, e);
1207         }
1208     }
1209
1210     private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1211         try {
1212             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1213         } catch (IOException e) {
1214             log.error("Failed to send" + event + " to " + recipient, e);
1215         }
1216     }
1217
1218     private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1219         try {
1220             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1221         } catch (IOException e) {
1222             log.error("Failed to send" + event + " to " + recipient, e);
1223         }
1224     }
1225
1226     private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1227         try {
1228             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1229         } catch (IOException e) {
1230             log.error("Failed to send" + event + " to " + recipient, e);
1231         }
1232     }
1233
1234     private DeviceAntiEntropyAdvertisement createAdvertisement() {
1235         final NodeId self = clusterService.getLocalNode().id();
1236
1237         final int numDevices = deviceDescs.size();
1238         Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1239         final int portsPerDevice = 8; // random factor to minimize reallocation
1240         Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1241         Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1242
1243         deviceDescs.forEach((deviceId, devDescs) -> {
1244
1245             // for each Device...
1246             synchronized (devDescs) {
1247
1248                 // send device offline timestamp
1249                 Timestamp lOffline = this.offline.get(deviceId);
1250                 if (lOffline != null) {
1251                     adOffline.put(deviceId, lOffline);
1252                 }
1253
1254                 for (Entry<ProviderId, DeviceDescriptions>
1255                         prov : devDescs.entrySet()) {
1256
1257                     // for each Provider Descriptions...
1258                     final ProviderId provId = prov.getKey();
1259                     final DeviceDescriptions descs = prov.getValue();
1260
1261                     adDevices.put(new DeviceFragmentId(deviceId, provId),
1262                                   descs.getDeviceDesc().timestamp());
1263
1264                     for (Entry<PortNumber, Timestamped<PortDescription>>
1265                             portDesc : descs.getPortDescs().entrySet()) {
1266
1267                         final PortNumber number = portDesc.getKey();
1268                         adPorts.put(new PortFragmentId(deviceId, provId, number),
1269                                     portDesc.getValue().timestamp());
1270                     }
1271                 }
1272             }
1273         });
1274
1275         return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1276     }
1277
1278     /**
1279      * Responds to anti-entropy advertisement message.
1280      * <p/>
1281      * Notify sender about out-dated information using regular replication message.
1282      * Send back advertisement to sender if not in sync.
1283      *
1284      * @param advertisement to respond to
1285      */
1286     private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1287
1288         final NodeId sender = advertisement.sender();
1289
1290         Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1291         Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1292         Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1293
1294         // Fragments to request
1295         Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1296         Collection<PortFragmentId> reqPorts = new ArrayList<>();
1297
1298         for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1299             final DeviceId deviceId = de.getKey();
1300             final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1301
1302             synchronized (lDevice) {
1303                 // latestTimestamp across provider
1304                 // Note: can be null initially
1305                 Timestamp localLatest = offline.get(deviceId);
1306
1307                 // handle device Ads
1308                 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1309                     final ProviderId provId = prov.getKey();
1310                     final DeviceDescriptions lDeviceDescs = prov.getValue();
1311
1312                     final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1313
1314
1315                     Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1316                     Timestamp advDevTimestamp = devAds.get(devFragId);
1317
1318                     if (advDevTimestamp == null || lProvDevice.isNewerThan(
1319                             advDevTimestamp)) {
1320                         // remote does not have it or outdated, suggest
1321                         notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1322                     } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1323                         // local is outdated, request
1324                         reqDevices.add(devFragId);
1325                     }
1326
1327                     // handle port Ads
1328                     for (Entry<PortNumber, Timestamped<PortDescription>>
1329                             pe : lDeviceDescs.getPortDescs().entrySet()) {
1330
1331                         final PortNumber num = pe.getKey();
1332                         final Timestamped<PortDescription> lPort = pe.getValue();
1333
1334                         final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1335
1336                         Timestamp advPortTimestamp = portAds.get(portFragId);
1337                         if (advPortTimestamp == null || lPort.isNewerThan(
1338                                 advPortTimestamp)) {
1339                             // remote does not have it or outdated, suggest
1340                             notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1341                         } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1342                             // local is outdated, request
1343                             log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1344                             reqPorts.add(portFragId);
1345                         }
1346
1347                         // remove port Ad already processed
1348                         portAds.remove(portFragId);
1349                     } // end local port loop
1350
1351                     // remove device Ad already processed
1352                     devAds.remove(devFragId);
1353
1354                     // find latest and update
1355                     final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1356                     if (localLatest == null ||
1357                             providerLatest.compareTo(localLatest) > 0) {
1358                         localLatest = providerLatest;
1359                     }
1360                 } // end local provider loop
1361
1362                 // checking if remote timestamp is more recent.
1363                 Timestamp rOffline = offlineAds.get(deviceId);
1364                 if (rOffline != null &&
1365                         rOffline.compareTo(localLatest) > 0) {
1366                     // remote offline timestamp suggests that the
1367                     // device is off-line
1368                     markOfflineInternal(deviceId, rOffline);
1369                 }
1370
1371                 Timestamp lOffline = offline.get(deviceId);
1372                 if (lOffline != null && rOffline == null) {
1373                     // locally offline, but remote is online, suggest offline
1374                     notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1375                 }
1376
1377                 // remove device offline Ad already processed
1378                 offlineAds.remove(deviceId);
1379             } // end local device loop
1380         } // device lock
1381
1382         // If there is any Ads left, request them
1383         log.trace("Ads left {}, {}", devAds, portAds);
1384         reqDevices.addAll(devAds.keySet());
1385         reqPorts.addAll(portAds.keySet());
1386
1387         if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1388             log.trace("Nothing to request to remote peer {}", sender);
1389             return;
1390         }
1391
1392         log.debug("Need to sync {} {}", reqDevices, reqPorts);
1393
1394         // 2-way Anti-Entropy for now
1395         try {
1396             unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1397         } catch (IOException e) {
1398             log.error("Failed to send response advertisement to " + sender, e);
1399         }
1400
1401 // Sketch of 3-way Anti-Entropy
1402 //        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1403 //        ClusterMessage message = new ClusterMessage(
1404 //                clusterService.getLocalNode().id(),
1405 //                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1406 //                SERIALIZER.encode(request));
1407 //
1408 //        try {
1409 //            clusterCommunicator.unicast(message, advertisement.sender());
1410 //        } catch (IOException e) {
1411 //            log.error("Failed to send advertisement reply to "
1412 //                      + advertisement.sender(), e);
1413 //        }
1414     }
1415
1416     private void notifyDelegateIfNotNull(DeviceEvent event) {
1417         if (event != null) {
1418             notifyDelegate(event);
1419         }
1420     }
1421
1422     private final class SendAdvertisementTask implements Runnable {
1423
1424         @Override
1425         public void run() {
1426             if (Thread.currentThread().isInterrupted()) {
1427                 log.debug("Interrupted, quitting");
1428                 return;
1429             }
1430
1431             try {
1432                 final NodeId self = clusterService.getLocalNode().id();
1433                 Set<ControllerNode> nodes = clusterService.getNodes();
1434
1435                 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1436                         .transform(toNodeId())
1437                         .toList();
1438
1439                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1440                     log.trace("No other peers in the cluster.");
1441                     return;
1442                 }
1443
1444                 NodeId peer;
1445                 do {
1446                     int idx = RandomUtils.nextInt(0, nodeIds.size());
1447                     peer = nodeIds.get(idx);
1448                 } while (peer.equals(self));
1449
1450                 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1451
1452                 if (Thread.currentThread().isInterrupted()) {
1453                     log.debug("Interrupted, quitting");
1454                     return;
1455                 }
1456
1457                 try {
1458                     unicastMessage(peer, DEVICE_ADVERTISE, ad);
1459                 } catch (IOException e) {
1460                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
1461                     return;
1462                 }
1463             } catch (Exception e) {
1464                 // catch all Exception to avoid Scheduled task being suppressed.
1465                 log.error("Exception thrown while sending advertisement", e);
1466             }
1467         }
1468     }
1469
1470     private final class InternalDeviceEventListener
1471             implements ClusterMessageHandler {
1472         @Override
1473         public void handle(ClusterMessage message) {
1474             log.debug("Received device update event from peer: {}", message.sender());
1475             InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1476
1477             ProviderId providerId = event.providerId();
1478             DeviceId deviceId = event.deviceId();
1479             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1480
1481             try {
1482                 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1483             } catch (Exception e) {
1484                 log.warn("Exception thrown handling device update", e);
1485             }
1486         }
1487     }
1488
1489     private final class InternalDeviceOfflineEventListener
1490             implements ClusterMessageHandler {
1491         @Override
1492         public void handle(ClusterMessage message) {
1493             log.debug("Received device offline event from peer: {}", message.sender());
1494             InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1495
1496             DeviceId deviceId = event.deviceId();
1497             Timestamp timestamp = event.timestamp();
1498
1499             try {
1500                 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1501             } catch (Exception e) {
1502                 log.warn("Exception thrown handling device offline", e);
1503             }
1504         }
1505     }
1506
1507     private final class InternalRemoveRequestListener
1508             implements ClusterMessageHandler {
1509         @Override
1510         public void handle(ClusterMessage message) {
1511             log.debug("Received device remove request from peer: {}", message.sender());
1512             DeviceId did = SERIALIZER.decode(message.payload());
1513
1514             try {
1515                 removeDevice(did);
1516             } catch (Exception e) {
1517                 log.warn("Exception thrown handling device remove", e);
1518             }
1519         }
1520     }
1521
1522     private final class InternalDeviceRemovedEventListener
1523             implements ClusterMessageHandler {
1524         @Override
1525         public void handle(ClusterMessage message) {
1526             log.debug("Received device removed event from peer: {}", message.sender());
1527             InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1528
1529             DeviceId deviceId = event.deviceId();
1530             Timestamp timestamp = event.timestamp();
1531
1532             try {
1533                 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1534             } catch (Exception e) {
1535                 log.warn("Exception thrown handling device removed", e);
1536             }
1537         }
1538     }
1539
1540     private final class InternalPortEventListener
1541             implements ClusterMessageHandler {
1542         @Override
1543         public void handle(ClusterMessage message) {
1544
1545             log.debug("Received port update event from peer: {}", message.sender());
1546             InternalPortEvent event = SERIALIZER.decode(message.payload());
1547
1548             ProviderId providerId = event.providerId();
1549             DeviceId deviceId = event.deviceId();
1550             Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1551
1552             if (getDevice(deviceId) == null) {
1553                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1554                 // Note: dropped information will be recovered by anti-entropy
1555                 return;
1556             }
1557
1558             try {
1559                 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1560             } catch (Exception e) {
1561                 log.warn("Exception thrown handling port update", e);
1562             }
1563         }
1564     }
1565
1566     private final class InternalPortStatusEventListener
1567             implements ClusterMessageHandler {
1568         @Override
1569         public void handle(ClusterMessage message) {
1570
1571             log.debug("Received port status update event from peer: {}", message.sender());
1572             InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1573
1574             ProviderId providerId = event.providerId();
1575             DeviceId deviceId = event.deviceId();
1576             Timestamped<PortDescription> portDescription = event.portDescription();
1577
1578             if (getDevice(deviceId) == null) {
1579                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1580                 // Note: dropped information will be recovered by anti-entropy
1581                 return;
1582             }
1583
1584             try {
1585                 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1586             } catch (Exception e) {
1587                 log.warn("Exception thrown handling port update", e);
1588             }
1589         }
1590     }
1591
1592     private final class InternalDeviceAdvertisementListener
1593             implements ClusterMessageHandler {
1594         @Override
1595         public void handle(ClusterMessage message) {
1596             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1597             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1598             try {
1599                 handleAdvertisement(advertisement);
1600             } catch (Exception e) {
1601                 log.warn("Exception thrown handling Device advertisements.", e);
1602             }
1603         }
1604     }
1605
1606     private final class DeviceInjectedEventListener
1607             implements ClusterMessageHandler {
1608         @Override
1609         public void handle(ClusterMessage message) {
1610             log.debug("Received injected device event from peer: {}", message.sender());
1611             DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1612
1613             ProviderId providerId = event.providerId();
1614             DeviceId deviceId = event.deviceId();
1615             DeviceDescription deviceDescription = event.deviceDescription();
1616             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1617                 // workaround for ONOS-1208
1618                 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1619                 return;
1620             }
1621
1622             try {
1623                 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1624             } catch (Exception e) {
1625                 log.warn("Exception thrown handling device injected event.", e);
1626             }
1627         }
1628     }
1629
1630     private final class PortInjectedEventListener
1631             implements ClusterMessageHandler {
1632         @Override
1633         public void handle(ClusterMessage message) {
1634             log.debug("Received injected port event from peer: {}", message.sender());
1635             PortInjectedEvent event = SERIALIZER.decode(message.payload());
1636
1637             ProviderId providerId = event.providerId();
1638             DeviceId deviceId = event.deviceId();
1639             List<PortDescription> portDescriptions = event.portDescriptions();
1640             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1641                 // workaround for ONOS-1208
1642                 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1643                 return;
1644             }
1645
1646             try {
1647                 updatePorts(providerId, deviceId, portDescriptions);
1648             } catch (Exception e) {
1649                 log.warn("Exception thrown handling port injected event.", e);
1650             }
1651         }
1652     }
1653
1654     private class InternalPortStatsListener
1655             implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1656         @Override
1657         public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1658             if (event.type() == PUT) {
1659                 Device device = devices.get(event.key());
1660                 if (device != null) {
1661                     delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
1662                 }
1663             }
1664         }
1665     }
1666 }