2  * Copyright 2014-2015 Open Networking Laboratory
 
   4  * Licensed under the Apache License, Version 2.0 (the "License");
 
   5  * you may not use this file except in compliance with the License.
 
   6  * You may obtain a copy of the License at
 
   8  *     http://www.apache.org/licenses/LICENSE-2.0
 
  10  * Unless required by applicable law or agreed to in writing, software
 
  11  * distributed under the License is distributed on an "AS IS" BASIS,
 
  12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  13  * See the License for the specific language governing permissions and
 
  14  * limitations under the License.
 
  16 package org.onosproject.store.device.impl;
 
  18 import com.google.common.base.Function;
 
  19 import com.google.common.collect.FluentIterable;
 
  20 import com.google.common.collect.ImmutableList;
 
  21 import com.google.common.collect.Maps;
 
  22 import com.google.common.collect.Sets;
 
  24 import org.apache.commons.lang3.RandomUtils;
 
  25 import org.apache.felix.scr.annotations.Activate;
 
  26 import org.apache.felix.scr.annotations.Component;
 
  27 import org.apache.felix.scr.annotations.Deactivate;
 
  28 import org.apache.felix.scr.annotations.Reference;
 
  29 import org.apache.felix.scr.annotations.ReferenceCardinality;
 
  30 import org.apache.felix.scr.annotations.Service;
 
  31 import org.onlab.packet.ChassisId;
 
  32 import org.onlab.util.KryoNamespace;
 
  33 import org.onlab.util.NewConcurrentHashMap;
 
  34 import org.onosproject.cluster.ClusterService;
 
  35 import org.onosproject.cluster.ControllerNode;
 
  36 import org.onosproject.cluster.NodeId;
 
  37 import org.onosproject.mastership.MastershipService;
 
  38 import org.onosproject.mastership.MastershipTerm;
 
  39 import org.onosproject.mastership.MastershipTermService;
 
  40 import org.onosproject.net.Annotations;
 
  41 import org.onosproject.net.AnnotationsUtil;
 
  42 import org.onosproject.net.DefaultAnnotations;
 
  43 import org.onosproject.net.DefaultDevice;
 
  44 import org.onosproject.net.DefaultPort;
 
  45 import org.onosproject.net.Device;
 
  46 import org.onosproject.net.Device.Type;
 
  47 import org.onosproject.net.DeviceId;
 
  48 import org.onosproject.net.MastershipRole;
 
  49 import org.onosproject.net.OchPort;
 
  50 import org.onosproject.net.OduCltPort;
 
  51 import org.onosproject.net.OmsPort;
 
  52 import org.onosproject.net.Port;
 
  53 import org.onosproject.net.PortNumber;
 
  54 import org.onosproject.net.device.DefaultPortStatistics;
 
  55 import org.onosproject.net.device.DeviceClockService;
 
  56 import org.onosproject.net.device.DeviceDescription;
 
  57 import org.onosproject.net.device.DeviceEvent;
 
  58 import org.onosproject.net.device.DeviceStore;
 
  59 import org.onosproject.net.device.DeviceStoreDelegate;
 
  60 import org.onosproject.net.device.OchPortDescription;
 
  61 import org.onosproject.net.device.OduCltPortDescription;
 
  62 import org.onosproject.net.device.OmsPortDescription;
 
  63 import org.onosproject.net.device.PortDescription;
 
  64 import org.onosproject.net.device.PortStatistics;
 
  65 import org.onosproject.net.provider.ProviderId;
 
  66 import org.onosproject.store.AbstractStore;
 
  67 import org.onosproject.store.Timestamp;
 
  68 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 
  69 import org.onosproject.store.cluster.messaging.ClusterMessage;
 
  70 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 
  71 import org.onosproject.store.cluster.messaging.MessageSubject;
 
  72 import org.onosproject.store.impl.Timestamped;
 
  73 import org.onosproject.store.serializers.KryoNamespaces;
 
  74 import org.onosproject.store.serializers.KryoSerializer;
 
  75 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
 
  76 import org.onosproject.store.service.EventuallyConsistentMap;
 
  77 import org.onosproject.store.service.EventuallyConsistentMapEvent;
 
  78 import org.onosproject.store.service.EventuallyConsistentMapListener;
 
  79 import org.onosproject.store.service.MultiValuedTimestamp;
 
  80 import org.onosproject.store.service.StorageService;
 
  81 import org.onosproject.store.service.WallClockTimestamp;
 
  82 import org.slf4j.Logger;
 
  84 import java.io.IOException;
 
  85 import java.util.ArrayList;
 
  86 import java.util.Collection;
 
  87 import java.util.Collections;
 
  88 import java.util.HashMap;
 
  89 import java.util.HashSet;
 
  90 import java.util.Iterator;
 
  91 import java.util.List;
 
  93 import java.util.Map.Entry;
 
  94 import java.util.Objects;
 
  96 import java.util.concurrent.ConcurrentMap;
 
  97 import java.util.concurrent.ExecutorService;
 
  98 import java.util.concurrent.Executors;
 
  99 import java.util.concurrent.ScheduledExecutorService;
 
 100 import java.util.concurrent.TimeUnit;
 
 102 import static com.google.common.base.Preconditions.checkArgument;
 
 103 import static com.google.common.base.Predicates.notNull;
 
 104 import static com.google.common.base.Verify.verify;
 
 105 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 
 106 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
 
 107 import static org.onlab.util.Tools.groupedThreads;
 
 108 import static org.onlab.util.Tools.minPriority;
 
 109 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
 
 110 import static org.onosproject.net.DefaultAnnotations.merge;
 
 111 import static org.onosproject.net.device.DeviceEvent.Type.*;
 
 112 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
 
 113 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
 
 114 import static org.slf4j.LoggerFactory.getLogger;
 
 117  * Manages inventory of infrastructure devices using gossip protocol to distribute
 
 120 @Component(immediate = true)
 
 122 public class GossipDeviceStore
 
 123         extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
 
 124         implements DeviceStore {
 
 126     private final Logger log = getLogger(getClass());
 
 128     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
 
 129     // Timeout in milliseconds to process device or ports on remote master node
 
 130     private static final int REMOTE_MASTER_TIMEOUT = 1000;
 
 132     // innerMap is used to lock a Device, thus instance should never be replaced.
 
 133     // collection of Description given from various providers
 
 134     private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
 
 135             deviceDescs = Maps.newConcurrentMap();
 
 137     // cache of Device and Ports generated by compositing descriptions from providers
 
 138     private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
 
 139     private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
 
 141     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
 
 142     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
 
 143     private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
 
 144             portStatsListener = new InternalPortStatsListener();
 
 146     // to be updated under Device lock
 
 147     private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
 
 148     private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
 
 150     // available(=UP) devices
 
 151     private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
 
 153     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 154     protected DeviceClockService deviceClockService;
 
 156     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 157     protected StorageService storageService;
 
 159     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 160     protected ClusterCommunicationService clusterCommunicator;
 
 162     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 163     protected ClusterService clusterService;
 
 165     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 166     protected MastershipService mastershipService;
 
 168     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
 
 169     protected MastershipTermService termService;
 
 172     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
 
 174         protected void setupKryoPool() {
 
 175             serializerPool = KryoNamespace.newBuilder()
 
 176                     .register(DistributedStoreSerializers.STORE_COMMON)
 
 177                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
 
 178                     .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
 
 179                     .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
 
 180                     .register(InternalDeviceRemovedEvent.class)
 
 181                     .register(new InternalPortEventSerializer(), InternalPortEvent.class)
 
 182                     .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
 
 183                     .register(DeviceAntiEntropyAdvertisement.class)
 
 184                     .register(DeviceFragmentId.class)
 
 185                     .register(PortFragmentId.class)
 
 186                     .register(DeviceInjectedEvent.class)
 
 187                     .register(PortInjectedEvent.class)
 
 192     private ExecutorService executor;
 
 194     private ScheduledExecutorService backgroundExecutor;
 
 196     // TODO make these anti-entropy parameters configurable
 
 197     private long initialDelaySec = 5;
 
 198     private long periodSec = 5;
 
 201     public void activate() {
 
 202         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
 
 205                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
 
 207         clusterCommunicator.addSubscriber(
 
 208                 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
 
 209         clusterCommunicator.addSubscriber(
 
 210                 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
 
 211                 new InternalDeviceOfflineEventListener(),
 
 213         clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
 
 214                                           new InternalRemoveRequestListener(),
 
 216         clusterCommunicator.addSubscriber(
 
 217                 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
 
 218         clusterCommunicator.addSubscriber(
 
 219                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
 
 220         clusterCommunicator.addSubscriber(
 
 221                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
 
 222         clusterCommunicator.addSubscriber(
 
 223                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
 
 224                 new InternalDeviceAdvertisementListener(),
 
 226         clusterCommunicator.addSubscriber(
 
 227                 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
 
 228         clusterCommunicator.addSubscriber(
 
 229                 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
 
 231         // start anti-entropy thread
 
 232         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
 
 233                                                initialDelaySec, periodSec, TimeUnit.SECONDS);
 
 235         // Create a distributed map for port stats.
 
 236         KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
 
 237                 .register(KryoNamespaces.API)
 
 238                 .register(DefaultPortStatistics.class)
 
 239                 .register(DeviceId.class)
 
 240                 .register(MultiValuedTimestamp.class)
 
 241                 .register(WallClockTimestamp.class);
 
 243         devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
 
 244                 .withName("port-stats")
 
 245                 .withSerializer(deviceDataSerializer)
 
 246                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
 
 247                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
 
 248                 .withTombstonesDisabled()
 
 250         devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
 
 251                 eventuallyConsistentMapBuilder()
 
 252                 .withName("port-stats-delta")
 
 253                 .withSerializer(deviceDataSerializer)
 
 254                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
 
 255                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
 
 256                 .withTombstonesDisabled()
 
 258         devicePortStats.addListener(portStatsListener);
 
 263     public void deactivate() {
 
 264         devicePortStats.destroy();
 
 265         devicePortDeltaStats.destroy();
 
 266         executor.shutdownNow();
 
 268         backgroundExecutor.shutdownNow();
 
 270             if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
 
 271                 log.error("Timeout during executor shutdown");
 
 273         } catch (InterruptedException e) {
 
 274             log.error("Error during executor shutdown", e);
 
 280         availableDevices.clear();
 
 285     public int getDeviceCount() {
 
 286         return devices.size();
 
 290     public Iterable<Device> getDevices() {
 
 291         return Collections.unmodifiableCollection(devices.values());
 
 295     public Iterable<Device> getAvailableDevices() {
 
 296         return FluentIterable.from(getDevices())
 
 297                 .filter(input -> isAvailable(input.id()));
 
 301     public Device getDevice(DeviceId deviceId) {
 
 302         return devices.get(deviceId);
 
 306     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
 
 308                                                          DeviceDescription deviceDescription) {
 
 309         NodeId localNode = clusterService.getLocalNode().id();
 
 310         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
 
 312         // Process device update only if we're the master,
 
 313         // otherwise signal the actual master.
 
 314         DeviceEvent deviceEvent = null;
 
 315         if (localNode.equals(deviceNode)) {
 
 317             final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
 
 318             final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
 
 319             final Timestamped<DeviceDescription> mergedDesc;
 
 320             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
 
 322             synchronized (device) {
 
 323                 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
 
 324                 mergedDesc = device.get(providerId).getDeviceDesc();
 
 327             if (deviceEvent != null) {
 
 328                 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
 
 329                          providerId, deviceId);
 
 330                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
 
 334             // FIXME Temporary hack for NPE (ONOS-1171).
 
 335             // Proper fix is to implement forwarding to master on ConfigProvider
 
 337             if (deviceNode == null) {
 
 343             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
 
 344                     providerId, deviceId, deviceDescription);
 
 346             // TODO check unicast return value
 
 347             clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
 
 349             log.warn("Failed to process injected device id: {} desc: {} " +
 
 350                             "(cluster messaging failed: {})",
 
 351                     deviceId, deviceDescription, e);
 
 358     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
 
 360                                                      Timestamped<DeviceDescription> deltaDesc) {
 
 362         // Collection of DeviceDescriptions for a Device
 
 363         Map<ProviderId, DeviceDescriptions> device
 
 364                 = getOrCreateDeviceDescriptionsMap(deviceId);
 
 366         synchronized (device) {
 
 367             // locking per device
 
 369             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
 
 370                 log.debug("Ignoring outdated event: {}", deltaDesc);
 
 374             DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
 
 376             final Device oldDevice = devices.get(deviceId);
 
 377             final Device newDevice;
 
 379             if (deltaDesc == descs.getDeviceDesc() ||
 
 380                     deltaDesc.isNewer(descs.getDeviceDesc())) {
 
 381                 // on new device or valid update
 
 382                 descs.putDeviceDesc(deltaDesc);
 
 383                 newDevice = composeDevice(deviceId, device);
 
 385                 // outdated event, ignored.
 
 388             if (oldDevice == null) {
 
 390                 return createDevice(providerId, newDevice, deltaDesc.timestamp());
 
 392                 // UPDATE or ignore (no change or stale)
 
 393                 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
 
 398     // Creates the device and returns the appropriate event if necessary.
 
 399     // Guarded by deviceDescs value (=Device lock)
 
 400     private DeviceEvent createDevice(ProviderId providerId,
 
 401                                      Device newDevice, Timestamp timestamp) {
 
 403         // update composed device cache
 
 404         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
 
 405         verify(oldDevice == null,
 
 406                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
 
 407                providerId, oldDevice, newDevice);
 
 409         if (!providerId.isAncillary()) {
 
 410             markOnline(newDevice.id(), timestamp);
 
 413         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
 
 416     // Updates the device and returns the appropriate event if necessary.
 
 417     // Guarded by deviceDescs value (=Device lock)
 
 418     private DeviceEvent updateDevice(ProviderId providerId,
 
 420                                      Device newDevice, Timestamp newTimestamp) {
 
 421         // We allow only certain attributes to trigger update
 
 422         boolean propertiesChanged =
 
 423                 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
 
 424                         !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
 
 425                         !Objects.equals(oldDevice.providerId(), newDevice.providerId());
 
 426         boolean annotationsChanged =
 
 427                 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
 
 429         // Primary providers can respond to all changes, but ancillary ones
 
 430         // should respond only to annotation changes.
 
 431         if ((providerId.isAncillary() && annotationsChanged) ||
 
 432                 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
 
 433             boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
 
 436                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
 
 437                        providerId, oldDevice, devices.get(newDevice.id())
 
 440             if (!providerId.isAncillary()) {
 
 441                 boolean wasOnline = availableDevices.contains(newDevice.id());
 
 442                 markOnline(newDevice.id(), newTimestamp);
 
 444                     notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
 
 448             return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
 
 454     public DeviceEvent markOffline(DeviceId deviceId) {
 
 455         final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
 
 456         final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
 
 458             log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
 
 459                      deviceId, timestamp);
 
 460             notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
 
 465     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
 
 467         Map<ProviderId, DeviceDescriptions> providerDescs
 
 468                 = getOrCreateDeviceDescriptionsMap(deviceId);
 
 471         synchronized (providerDescs) {
 
 473             // accept off-line if given timestamp is newer than
 
 474             // the latest Timestamp from Primary provider
 
 475             DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
 
 476             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
 
 477             if (timestamp.compareTo(lastTimestamp) <= 0) {
 
 478                 // outdated event ignore
 
 482             offline.put(deviceId, timestamp);
 
 484             Device device = devices.get(deviceId);
 
 485             if (device == null) {
 
 488             boolean removed = availableDevices.remove(deviceId);
 
 490                 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
 
 497      * Marks the device as available if the given timestamp is not outdated,
 
 498      * compared to the time the device has been marked offline.
 
 500      * @param deviceId  identifier of the device
 
 501      * @param timestamp of the event triggering this change.
 
 502      * @return true if availability change request was accepted and changed the state
 
 504     // Guarded by deviceDescs value (=Device lock)
 
 505     private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
 
 506         // accept on-line if given timestamp is newer than
 
 507         // the latest offline request Timestamp
 
 508         Timestamp offlineTimestamp = offline.get(deviceId);
 
 509         if (offlineTimestamp == null ||
 
 510                 offlineTimestamp.compareTo(timestamp) < 0) {
 
 512             offline.remove(deviceId);
 
 513             return availableDevices.add(deviceId);
 
 519     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
 
 521                                                       List<PortDescription> portDescriptions) {
 
 523         NodeId localNode = clusterService.getLocalNode().id();
 
 524         // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
 
 525         // since it will trigger distributed store read.
 
 526         // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
 
 527         // outside Device subsystem. so that we don't have to modify both Device and Link stores.
 
 528         // If we don't care much about topology performance, then it might be OK.
 
 529         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
 
 531         // Process port update only if we're the master of the device,
 
 532         // otherwise signal the actual master.
 
 533         List<DeviceEvent> deviceEvents = null;
 
 534         if (localNode.equals(deviceNode)) {
 
 536             final Timestamp newTimestamp;
 
 538                 newTimestamp = deviceClockService.getTimestamp(deviceId);
 
 539             } catch (IllegalStateException e) {
 
 540                 log.info("Timestamp was not available for device {}", deviceId);
 
 541                 log.debug("  discarding {}", portDescriptions);
 
 542                 // Failed to generate timestamp.
 
 544                 // Possible situation:
 
 545                 //  Device connected and became master for short period of time,
 
 546                 // but lost mastership before this instance had the chance to
 
 547                 // retrieve term information.
 
 549                 // Information dropped here is expected to be recoverable by
 
 550                 // device probing after mastership change
 
 552                 return Collections.emptyList();
 
 554             log.debug("timestamp for {} {}", deviceId, newTimestamp);
 
 556             final Timestamped<List<PortDescription>> timestampedInput
 
 557                     = new Timestamped<>(portDescriptions, newTimestamp);
 
 558             final Timestamped<List<PortDescription>> merged;
 
 560             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
 
 562             synchronized (device) {
 
 563                 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
 
 564                 final DeviceDescriptions descs = device.get(providerId);
 
 565                 List<PortDescription> mergedList =
 
 566                         FluentIterable.from(portDescriptions)
 
 567                                 .transform(new Function<PortDescription, PortDescription>() {
 
 569                                     public PortDescription apply(PortDescription input) {
 
 570                                         // lookup merged port description
 
 571                                         return descs.getPortDesc(input.portNumber()).value();
 
 574                 merged = new Timestamped<>(mergedList, newTimestamp);
 
 577             if (!deviceEvents.isEmpty()) {
 
 578                 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
 
 579                          providerId, deviceId);
 
 580                 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
 
 584             // FIXME Temporary hack for NPE (ONOS-1171).
 
 585             // Proper fix is to implement forwarding to master on ConfigProvider
 
 587             if (deviceNode == null) {
 
 589                 return Collections.emptyList();
 
 592             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
 
 594             //TODO check unicast return value
 
 595             clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
 
 597             log.warn("Failed to process injected ports of device id: {} " +
 
 598                             "(cluster messaging failed: {})",
 
 603         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
 
 606     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
 
 608                                                   Timestamped<List<PortDescription>> portDescriptions) {
 
 610         Device device = devices.get(deviceId);
 
 611         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
 
 613         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
 
 614         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
 616         List<DeviceEvent> events = new ArrayList<>();
 
 617         synchronized (descsMap) {
 
 619             if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
 
 620                 log.debug("Ignoring outdated events: {}", portDescriptions);
 
 621                 return Collections.emptyList();
 
 624             DeviceDescriptions descs = descsMap.get(providerId);
 
 625             // every provider must provide DeviceDescription.
 
 626             checkArgument(descs != null,
 
 627                           "Device description for Device ID %s from Provider %s was not found",
 
 628                           deviceId, providerId);
 
 630             Map<PortNumber, Port> ports = getPortMap(deviceId);
 
 632             final Timestamp newTimestamp = portDescriptions.timestamp();
 
 635             Set<PortNumber> processed = new HashSet<>();
 
 636             for (PortDescription portDescription : portDescriptions.value()) {
 
 637                 final PortNumber number = portDescription.portNumber();
 
 638                 processed.add(number);
 
 640                 final Port oldPort = ports.get(number);
 
 644                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
 
 645                 if (existingPortDesc == null ||
 
 646                         newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
 
 647                     // on new port or valid update
 
 648                     // update description
 
 649                     descs.putPortDesc(new Timestamped<>(portDescription,
 
 650                                                         portDescriptions.timestamp()));
 
 651                     newPort = composePort(device, number, descsMap);
 
 653                     // outdated event, ignored.
 
 657                 events.add(oldPort == null ?
 
 658                                    createPort(device, newPort, ports) :
 
 659                                    updatePort(device, oldPort, newPort, ports));
 
 662             events.addAll(pruneOldPorts(device, ports, processed));
 
 664         return FluentIterable.from(events).filter(notNull()).toList();
 
 667     // Creates a new port based on the port description adds it to the map and
 
 668     // Returns corresponding event.
 
 669     // Guarded by deviceDescs value (=Device lock)
 
 670     private DeviceEvent createPort(Device device, Port newPort,
 
 671                                    Map<PortNumber, Port> ports) {
 
 672         ports.put(newPort.number(), newPort);
 
 673         return new DeviceEvent(PORT_ADDED, device, newPort);
 
 676     // Checks if the specified port requires update and if so, it replaces the
 
 677     // existing entry in the map and returns corresponding event.
 
 678     // Guarded by deviceDescs value (=Device lock)
 
 679     private DeviceEvent updatePort(Device device, Port oldPort,
 
 681                                    Map<PortNumber, Port> ports) {
 
 682         if (oldPort.isEnabled() != newPort.isEnabled() ||
 
 683                 oldPort.type() != newPort.type() ||
 
 684                 oldPort.portSpeed() != newPort.portSpeed() ||
 
 685                 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
 
 686             ports.put(oldPort.number(), newPort);
 
 687             return new DeviceEvent(PORT_UPDATED, device, newPort);
 
 692     // Prunes the specified list of ports based on which ports are in the
 
 693     // processed list and returns list of corresponding events.
 
 694     // Guarded by deviceDescs value (=Device lock)
 
 695     private List<DeviceEvent> pruneOldPorts(Device device,
 
 696                                             Map<PortNumber, Port> ports,
 
 697                                             Set<PortNumber> processed) {
 
 698         List<DeviceEvent> events = new ArrayList<>();
 
 699         Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
 
 700         while (iterator.hasNext()) {
 
 701             Entry<PortNumber, Port> e = iterator.next();
 
 702             PortNumber portNumber = e.getKey();
 
 703             if (!processed.contains(portNumber)) {
 
 704                 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
 
 711     // Gets the map of ports for the specified device; if one does not already
 
 712     // exist, it creates and registers a new one.
 
 713     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
 
 714         return createIfAbsentUnchecked(devicePorts, deviceId,
 
 715                                        NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
 
 718     private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
 
 720         Map<ProviderId, DeviceDescriptions> r;
 
 721         r = deviceDescs.get(deviceId);
 
 724             final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
 
 725             concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
 
 726             if (concurrentlyAdded != null) {
 
 727                 r = concurrentlyAdded;
 
 733     // Guarded by deviceDescs value (=Device lock)
 
 734     private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
 
 735             Map<ProviderId, DeviceDescriptions> device,
 
 736             ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
 
 737         synchronized (device) {
 
 738             DeviceDescriptions r = device.get(providerId);
 
 740                 r = new DeviceDescriptions(deltaDesc);
 
 741                 device.put(providerId, r);
 
 748     public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
 
 750                                                      PortDescription portDescription) {
 
 751         final Timestamp newTimestamp;
 
 753             newTimestamp = deviceClockService.getTimestamp(deviceId);
 
 754         } catch (IllegalStateException e) {
 
 755             log.info("Timestamp was not available for device {}", deviceId);
 
 756             log.debug("  discarding {}", portDescription);
 
 757             // Failed to generate timestamp. Ignoring.
 
 758             // See updatePorts comment
 
 761         final Timestamped<PortDescription> deltaDesc
 
 762                 = new Timestamped<>(portDescription, newTimestamp);
 
 763         final DeviceEvent event;
 
 764         final Timestamped<PortDescription> mergedDesc;
 
 765         final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
 
 766         synchronized (device) {
 
 767             event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
 
 768             mergedDesc = device.get(providerId)
 
 769                     .getPortDesc(portDescription.portNumber());
 
 772             log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
 
 773                      providerId, deviceId);
 
 774             notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
 
 779     private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
 
 780                                                  Timestamped<PortDescription> deltaDesc) {
 
 781         Device device = devices.get(deviceId);
 
 782         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
 
 784         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
 
 785         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
 
 787         synchronized (descsMap) {
 
 789             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
 
 790                 log.debug("Ignoring outdated event: {}", deltaDesc);
 
 794             DeviceDescriptions descs = descsMap.get(providerId);
 
 795             // assuming all providers must to give DeviceDescription
 
 796             verify(descs != null,
 
 797                    "Device description for Device ID %s from Provider %s was not found",
 
 798                    deviceId, providerId);
 
 800             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
 
 801             final PortNumber number = deltaDesc.value().portNumber();
 
 802             final Port oldPort = ports.get(number);
 
 805             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
 
 806             if (existingPortDesc == null ||
 
 807                     deltaDesc.isNewer(existingPortDesc)) {
 
 808                 // on new port or valid update
 
 809                 // update description
 
 810                 descs.putPortDesc(deltaDesc);
 
 811                 newPort = composePort(device, number, descsMap);
 
 813                 // same or outdated event, ignored.
 
 814                 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
 
 818             if (oldPort == null) {
 
 819                 return createPort(device, newPort, ports);
 
 821                 return updatePort(device, oldPort, newPort, ports);
 
 827     public List<Port> getPorts(DeviceId deviceId) {
 
 828         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
 
 830             return Collections.emptyList();
 
 832         return ImmutableList.copyOf(ports.values());
 
 836     public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
 
 837                                             Collection<PortStatistics> newStatsCollection) {
 
 839         Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
 
 840         Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
 
 841         Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
 
 843         if (prvStatsMap != null) {
 
 844             for (PortStatistics newStats : newStatsCollection) {
 
 845                 PortNumber port = PortNumber.portNumber(newStats.port());
 
 846                 PortStatistics prvStats = prvStatsMap.get(port);
 
 847                 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
 
 848                 PortStatistics deltaStats = builder.build();
 
 849                 if (prvStats != null) {
 
 850                     deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
 
 852                 deltaStatsMap.put(port, deltaStats);
 
 853                 newStatsMap.put(port, newStats);
 
 856             for (PortStatistics newStats : newStatsCollection) {
 
 857                 PortNumber port = PortNumber.portNumber(newStats.port());
 
 858                 newStatsMap.put(port, newStats);
 
 861         devicePortDeltaStats.put(deviceId, deltaStatsMap);
 
 862         devicePortStats.put(deviceId, newStatsMap);
 
 863         // DeviceEvent returns null because of InternalPortStatsListener usage
 
 868      * Calculate delta statistics by subtracting previous from new statistics.
 
 870      * @param deviceId device identifier
 
 871      * @param prvStats previous port statistics
 
 872      * @param newStats new port statistics
 
 873      * @return PortStatistics
 
 875     public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
 
 876         // calculate time difference
 
 877         long deltaStatsSec, deltaStatsNano;
 
 878         if (newStats.durationNano() < prvStats.durationNano()) {
 
 879             deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
 
 880             deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
 
 882             deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
 
 883             deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
 
 885         DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
 
 886         DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
 
 887                 .setPort(newStats.port())
 
 888                 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
 
 889                 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
 
 890                 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
 
 891                 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
 
 892                 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
 
 893                 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
 
 894                 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
 
 895                 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
 
 896                 .setDurationSec(deltaStatsSec)
 
 897                 .setDurationNano(deltaStatsNano)
 
 903     public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
 
 904         Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
 
 905         if (portStats == null) {
 
 906             return Collections.emptyList();
 
 908         return ImmutableList.copyOf(portStats.values());
 
 912     public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
 
 913         Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
 
 914         if (portStats == null) {
 
 915             return Collections.emptyList();
 
 917         return ImmutableList.copyOf(portStats.values());
 
 921     public Port getPort(DeviceId deviceId, PortNumber portNumber) {
 
 922         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
 
 923         return ports == null ? null : ports.get(portNumber);
 
 927     public boolean isAvailable(DeviceId deviceId) {
 
 928         return availableDevices.contains(deviceId);
 
 932     public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
 
 933         final NodeId myId = clusterService.getLocalNode().id();
 
 934         NodeId master = mastershipService.getMasterFor(deviceId);
 
 936         // if there exist a master, forward
 
 937         // if there is no master, try to become one and process
 
 939         boolean relinquishAtEnd = false;
 
 940         if (master == null) {
 
 941             final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
 
 942             if (myRole != MastershipRole.NONE) {
 
 943                 relinquishAtEnd = true;
 
 945             log.debug("Temporarily requesting role for {} to remove", deviceId);
 
 946             mastershipService.requestRoleFor(deviceId);
 
 947             MastershipTerm term = termService.getMastershipTerm(deviceId);
 
 948             if (term != null && myId.equals(term.master())) {
 
 953         if (!myId.equals(master)) {
 
 954             log.debug("{} has control of {}, forwarding remove request",
 
 957             // TODO check unicast return value
 
 958             clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
 
 960              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
 
 963             // event will be triggered after master processes it.
 
 969         Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
 
 970         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
 
 972             log.debug("Notifying peers of a device removed topology event for deviceId: {}",
 
 974             notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
 
 976         if (relinquishAtEnd) {
 
 977             log.debug("Relinquishing temporary role acquired for {}", deviceId);
 
 978             mastershipService.relinquishMastership(deviceId);
 
 983     private DeviceEvent removeDeviceInternal(DeviceId deviceId,
 
 984                                              Timestamp timestamp) {
 
 986         Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
 
 987         synchronized (descs) {
 
 988             // accept removal request if given timestamp is newer than
 
 989             // the latest Timestamp from Primary provider
 
 990             DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
 
 991             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
 
 992             if (timestamp.compareTo(lastTimestamp) <= 0) {
 
 993                 // outdated event ignore
 
 996             removalRequest.put(deviceId, timestamp);
 
 998             Device device = devices.remove(deviceId);
 
 999             // should DEVICE_REMOVED carry removed ports?
 
1000             Map<PortNumber, Port> ports = devicePorts.get(deviceId);
 
1001             if (ports != null) {
 
1004             markOfflineInternal(deviceId, timestamp);
 
1006             return device == null ? null :
 
1007                     new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
 
1012      * Checks if given timestamp is superseded by removal request
 
1013      * with more recent timestamp.
 
1015      * @param deviceId         identifier of a device
 
1016      * @param timestampToCheck timestamp of an event to check
 
1017      * @return true if device is already removed
 
1019     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
 
1020         Timestamp removalTimestamp = removalRequest.get(deviceId);
 
1021         if (removalTimestamp != null &&
 
1022                 removalTimestamp.compareTo(timestampToCheck) >= 0) {
 
1023             // removalRequest is more recent
 
1030      * Returns a Device, merging description given from multiple Providers.
 
1032      * @param deviceId      device identifier
 
1033      * @param providerDescs Collection of Descriptions from multiple providers
 
1034      * @return Device instance
 
1036     private Device composeDevice(DeviceId deviceId,
 
1037                                  Map<ProviderId, DeviceDescriptions> providerDescs) {
 
1039         checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
 
1041         ProviderId primary = pickPrimaryPID(providerDescs);
 
1043         DeviceDescriptions desc = providerDescs.get(primary);
 
1045         final DeviceDescription base = desc.getDeviceDesc().value();
 
1046         Type type = base.type();
 
1047         String manufacturer = base.manufacturer();
 
1048         String hwVersion = base.hwVersion();
 
1049         String swVersion = base.swVersion();
 
1050         String serialNumber = base.serialNumber();
 
1051         ChassisId chassisId = base.chassisId();
 
1052         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
 
1053         annotations = merge(annotations, base.annotations());
 
1055         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
 
1056             if (e.getKey().equals(primary)) {
 
1059             // Note: should keep track of Description timestamp in the future
 
1060             // and only merge conflicting keys when timestamp is newer.
 
1061             // Currently assuming there will never be a key conflict between
 
1064             // annotation merging. not so efficient, should revisit later
 
1065             annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
 
1068         return new DefaultDevice(primary, deviceId, type, manufacturer,
 
1069                                  hwVersion, swVersion, serialNumber,
 
1070                                  chassisId, annotations);
 
1073     private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
 
1074                                  PortDescription description, Annotations annotations) {
 
1075         switch (description.type()) {
 
1077                 OmsPortDescription omsDesc = (OmsPortDescription) description;
 
1078                 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
 
1079                         omsDesc.maxFrequency(), omsDesc.grid(), annotations);
 
1081                 OchPortDescription ochDesc = (OchPortDescription) description;
 
1082                 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
 
1083                         ochDesc.isTunable(), ochDesc.lambda(), annotations);
 
1085                 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
 
1086                 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
 
1088                 return new DefaultPort(device, number, isEnabled, description.type(),
 
1089                         description.portSpeed(), annotations);
 
1094      * Returns a Port, merging description given from multiple Providers.
 
1096      * @param device   device the port is on
 
1097      * @param number   port number
 
1098      * @param descsMap Collection of Descriptions from multiple providers
 
1099      * @return Port instance
 
1101     private Port composePort(Device device, PortNumber number,
 
1102                              Map<ProviderId, DeviceDescriptions> descsMap) {
 
1104         ProviderId primary = pickPrimaryPID(descsMap);
 
1105         DeviceDescriptions primDescs = descsMap.get(primary);
 
1106         // if no primary, assume not enabled
 
1107         boolean isEnabled = false;
 
1108         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
 
1109         Timestamp newest = null;
 
1110         final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
 
1111         if (portDesc != null) {
 
1112             isEnabled = portDesc.value().isEnabled();
 
1113             annotations = merge(annotations, portDesc.value().annotations());
 
1114             newest = portDesc.timestamp();
 
1116         Port updated = null;
 
1117         for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
 
1118             if (e.getKey().equals(primary)) {
 
1121             // Note: should keep track of Description timestamp in the future
 
1122             // and only merge conflicting keys when timestamp is newer.
 
1123             // Currently assuming there will never be a key conflict between
 
1126             // annotation merging. not so efficient, should revisit later
 
1127             final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
 
1128             if (otherPortDesc != null) {
 
1129                 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
 
1132                 annotations = merge(annotations, otherPortDesc.value().annotations());
 
1133                 PortDescription other = otherPortDesc.value();
 
1134                 updated = buildTypedPort(device, number, isEnabled, other, annotations);
 
1135                 newest = otherPortDesc.timestamp();
 
1138         if (portDesc == null) {
 
1139             return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
 
1141         PortDescription current = portDesc.value();
 
1142         return updated == null
 
1143                 ? buildTypedPort(device, number, isEnabled, current, annotations)
 
1148      * @return primary ProviderID, or randomly chosen one if none exists
 
1150     private ProviderId pickPrimaryPID(
 
1151             Map<ProviderId, DeviceDescriptions> providerDescs) {
 
1152         ProviderId fallBackPrimary = null;
 
1153         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
 
1154             if (!e.getKey().isAncillary()) {
 
1156             } else if (fallBackPrimary == null) {
 
1157                 // pick randomly as a fallback in case there is no primary
 
1158                 fallBackPrimary = e.getKey();
 
1161         return fallBackPrimary;
 
1164     private DeviceDescriptions getPrimaryDescriptions(
 
1165             Map<ProviderId, DeviceDescriptions> providerDescs) {
 
1166         ProviderId pid = pickPrimaryPID(providerDescs);
 
1167         return providerDescs.get(pid);
 
1170     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
 
1171         clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
 
1174     private void broadcastMessage(MessageSubject subject, Object event) {
 
1175         clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
 
1178     private void notifyPeers(InternalDeviceEvent event) {
 
1179         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
 
1182     private void notifyPeers(InternalDeviceOfflineEvent event) {
 
1183         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
 
1186     private void notifyPeers(InternalDeviceRemovedEvent event) {
 
1187         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
 
1190     private void notifyPeers(InternalPortEvent event) {
 
1191         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
 
1194     private void notifyPeers(InternalPortStatusEvent event) {
 
1195         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
 
1198     private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
 
1200             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
 
1201         } catch (IOException e) {
 
1202             log.error("Failed to send" + event + " to " + recipient, e);
 
1206     private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
 
1208             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
 
1209         } catch (IOException e) {
 
1210             log.error("Failed to send" + event + " to " + recipient, e);
 
1214     private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
 
1216             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
 
1217         } catch (IOException e) {
 
1218             log.error("Failed to send" + event + " to " + recipient, e);
 
1222     private void notifyPeer(NodeId recipient, InternalPortEvent event) {
 
1224             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
 
1225         } catch (IOException e) {
 
1226             log.error("Failed to send" + event + " to " + recipient, e);
 
1230     private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
 
1232             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
 
1233         } catch (IOException e) {
 
1234             log.error("Failed to send" + event + " to " + recipient, e);
 
1238     private DeviceAntiEntropyAdvertisement createAdvertisement() {
 
1239         final NodeId self = clusterService.getLocalNode().id();
 
1241         final int numDevices = deviceDescs.size();
 
1242         Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
 
1243         final int portsPerDevice = 8; // random factor to minimize reallocation
 
1244         Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
 
1245         Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
 
1247         deviceDescs.forEach((deviceId, devDescs) -> {
 
1249             // for each Device...
 
1250             synchronized (devDescs) {
 
1252                 // send device offline timestamp
 
1253                 Timestamp lOffline = this.offline.get(deviceId);
 
1254                 if (lOffline != null) {
 
1255                     adOffline.put(deviceId, lOffline);
 
1258                 for (Entry<ProviderId, DeviceDescriptions>
 
1259                         prov : devDescs.entrySet()) {
 
1261                     // for each Provider Descriptions...
 
1262                     final ProviderId provId = prov.getKey();
 
1263                     final DeviceDescriptions descs = prov.getValue();
 
1265                     adDevices.put(new DeviceFragmentId(deviceId, provId),
 
1266                                   descs.getDeviceDesc().timestamp());
 
1268                     for (Entry<PortNumber, Timestamped<PortDescription>>
 
1269                             portDesc : descs.getPortDescs().entrySet()) {
 
1271                         final PortNumber number = portDesc.getKey();
 
1272                         adPorts.put(new PortFragmentId(deviceId, provId, number),
 
1273                                     portDesc.getValue().timestamp());
 
1279         return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
 
1283      * Responds to anti-entropy advertisement message.
 
1285      * Notify sender about out-dated information using regular replication message.
 
1286      * Send back advertisement to sender if not in sync.
 
1288      * @param advertisement to respond to
 
1290     private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
 
1292         final NodeId sender = advertisement.sender();
 
1294         Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
 
1295         Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
 
1296         Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
 
1298         // Fragments to request
 
1299         Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
 
1300         Collection<PortFragmentId> reqPorts = new ArrayList<>();
 
1302         for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
 
1303             final DeviceId deviceId = de.getKey();
 
1304             final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
 
1306             synchronized (lDevice) {
 
1307                 // latestTimestamp across provider
 
1308                 // Note: can be null initially
 
1309                 Timestamp localLatest = offline.get(deviceId);
 
1311                 // handle device Ads
 
1312                 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
 
1313                     final ProviderId provId = prov.getKey();
 
1314                     final DeviceDescriptions lDeviceDescs = prov.getValue();
 
1316                     final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
 
1319                     Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
 
1320                     Timestamp advDevTimestamp = devAds.get(devFragId);
 
1322                     if (advDevTimestamp == null || lProvDevice.isNewerThan(
 
1324                         // remote does not have it or outdated, suggest
 
1325                         notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
 
1326                     } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
 
1327                         // local is outdated, request
 
1328                         reqDevices.add(devFragId);
 
1332                     for (Entry<PortNumber, Timestamped<PortDescription>>
 
1333                             pe : lDeviceDescs.getPortDescs().entrySet()) {
 
1335                         final PortNumber num = pe.getKey();
 
1336                         final Timestamped<PortDescription> lPort = pe.getValue();
 
1338                         final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
 
1340                         Timestamp advPortTimestamp = portAds.get(portFragId);
 
1341                         if (advPortTimestamp == null || lPort.isNewerThan(
 
1342                                 advPortTimestamp)) {
 
1343                             // remote does not have it or outdated, suggest
 
1344                             notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
 
1345                         } else if (!lPort.timestamp().equals(advPortTimestamp)) {
 
1346                             // local is outdated, request
 
1347                             log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
 
1348                             reqPorts.add(portFragId);
 
1351                         // remove port Ad already processed
 
1352                         portAds.remove(portFragId);
 
1353                     } // end local port loop
 
1355                     // remove device Ad already processed
 
1356                     devAds.remove(devFragId);
 
1358                     // find latest and update
 
1359                     final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
 
1360                     if (localLatest == null ||
 
1361                             providerLatest.compareTo(localLatest) > 0) {
 
1362                         localLatest = providerLatest;
 
1364                 } // end local provider loop
 
1366                 // checking if remote timestamp is more recent.
 
1367                 Timestamp rOffline = offlineAds.get(deviceId);
 
1368                 if (rOffline != null &&
 
1369                         rOffline.compareTo(localLatest) > 0) {
 
1370                     // remote offline timestamp suggests that the
 
1371                     // device is off-line
 
1372                     markOfflineInternal(deviceId, rOffline);
 
1375                 Timestamp lOffline = offline.get(deviceId);
 
1376                 if (lOffline != null && rOffline == null) {
 
1377                     // locally offline, but remote is online, suggest offline
 
1378                     notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
 
1381                 // remove device offline Ad already processed
 
1382                 offlineAds.remove(deviceId);
 
1383             } // end local device loop
 
1386         // If there is any Ads left, request them
 
1387         log.trace("Ads left {}, {}", devAds, portAds);
 
1388         reqDevices.addAll(devAds.keySet());
 
1389         reqPorts.addAll(portAds.keySet());
 
1391         if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
 
1392             log.trace("Nothing to request to remote peer {}", sender);
 
1396         log.debug("Need to sync {} {}", reqDevices, reqPorts);
 
1398         // 2-way Anti-Entropy for now
 
1400             unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
 
1401         } catch (IOException e) {
 
1402             log.error("Failed to send response advertisement to " + sender, e);
 
1405 // Sketch of 3-way Anti-Entropy
 
1406 //        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
 
1407 //        ClusterMessage message = new ClusterMessage(
 
1408 //                clusterService.getLocalNode().id(),
 
1409 //                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
 
1410 //                SERIALIZER.encode(request));
 
1413 //            clusterCommunicator.unicast(message, advertisement.sender());
 
1414 //        } catch (IOException e) {
 
1415 //            log.error("Failed to send advertisement reply to "
 
1416 //                      + advertisement.sender(), e);
 
1420     private void notifyDelegateIfNotNull(DeviceEvent event) {
 
1421         if (event != null) {
 
1422             notifyDelegate(event);
 
1426     private final class SendAdvertisementTask implements Runnable {
 
1430             if (Thread.currentThread().isInterrupted()) {
 
1431                 log.debug("Interrupted, quitting");
 
1436                 final NodeId self = clusterService.getLocalNode().id();
 
1437                 Set<ControllerNode> nodes = clusterService.getNodes();
 
1439                 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
 
1440                         .transform(toNodeId())
 
1443                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
 
1444                     log.trace("No other peers in the cluster.");
 
1450                     int idx = RandomUtils.nextInt(0, nodeIds.size());
 
1451                     peer = nodeIds.get(idx);
 
1452                 } while (peer.equals(self));
 
1454                 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
 
1456                 if (Thread.currentThread().isInterrupted()) {
 
1457                     log.debug("Interrupted, quitting");
 
1462                     unicastMessage(peer, DEVICE_ADVERTISE, ad);
 
1463                 } catch (IOException e) {
 
1464                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
 
1467             } catch (Exception e) {
 
1468                 // catch all Exception to avoid Scheduled task being suppressed.
 
1469                 log.error("Exception thrown while sending advertisement", e);
 
1474     private final class InternalDeviceEventListener
 
1475             implements ClusterMessageHandler {
 
1477         public void handle(ClusterMessage message) {
 
1478             log.debug("Received device update event from peer: {}", message.sender());
 
1479             InternalDeviceEvent event = SERIALIZER.decode(message.payload());
 
1481             ProviderId providerId = event.providerId();
 
1482             DeviceId deviceId = event.deviceId();
 
1483             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
 
1486                 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
 
1487             } catch (Exception e) {
 
1488                 log.warn("Exception thrown handling device update", e);
 
1493     private final class InternalDeviceOfflineEventListener
 
1494             implements ClusterMessageHandler {
 
1496         public void handle(ClusterMessage message) {
 
1497             log.debug("Received device offline event from peer: {}", message.sender());
 
1498             InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
 
1500             DeviceId deviceId = event.deviceId();
 
1501             Timestamp timestamp = event.timestamp();
 
1504                 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
 
1505             } catch (Exception e) {
 
1506                 log.warn("Exception thrown handling device offline", e);
 
1511     private final class InternalRemoveRequestListener
 
1512             implements ClusterMessageHandler {
 
1514         public void handle(ClusterMessage message) {
 
1515             log.debug("Received device remove request from peer: {}", message.sender());
 
1516             DeviceId did = SERIALIZER.decode(message.payload());
 
1520             } catch (Exception e) {
 
1521                 log.warn("Exception thrown handling device remove", e);
 
1526     private final class InternalDeviceRemovedEventListener
 
1527             implements ClusterMessageHandler {
 
1529         public void handle(ClusterMessage message) {
 
1530             log.debug("Received device removed event from peer: {}", message.sender());
 
1531             InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
 
1533             DeviceId deviceId = event.deviceId();
 
1534             Timestamp timestamp = event.timestamp();
 
1537                 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
 
1538             } catch (Exception e) {
 
1539                 log.warn("Exception thrown handling device removed", e);
 
1544     private final class InternalPortEventListener
 
1545             implements ClusterMessageHandler {
 
1547         public void handle(ClusterMessage message) {
 
1549             log.debug("Received port update event from peer: {}", message.sender());
 
1550             InternalPortEvent event = SERIALIZER.decode(message.payload());
 
1552             ProviderId providerId = event.providerId();
 
1553             DeviceId deviceId = event.deviceId();
 
1554             Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
 
1556             if (getDevice(deviceId) == null) {
 
1557                 log.debug("{} not found on this node yet, ignoring.", deviceId);
 
1558                 // Note: dropped information will be recovered by anti-entropy
 
1563                 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
 
1564             } catch (Exception e) {
 
1565                 log.warn("Exception thrown handling port update", e);
 
1570     private final class InternalPortStatusEventListener
 
1571             implements ClusterMessageHandler {
 
1573         public void handle(ClusterMessage message) {
 
1575             log.debug("Received port status update event from peer: {}", message.sender());
 
1576             InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
 
1578             ProviderId providerId = event.providerId();
 
1579             DeviceId deviceId = event.deviceId();
 
1580             Timestamped<PortDescription> portDescription = event.portDescription();
 
1582             if (getDevice(deviceId) == null) {
 
1583                 log.debug("{} not found on this node yet, ignoring.", deviceId);
 
1584                 // Note: dropped information will be recovered by anti-entropy
 
1589                 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
 
1590             } catch (Exception e) {
 
1591                 log.warn("Exception thrown handling port update", e);
 
1596     private final class InternalDeviceAdvertisementListener
 
1597             implements ClusterMessageHandler {
 
1599         public void handle(ClusterMessage message) {
 
1600             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
 
1601             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
 
1603                 handleAdvertisement(advertisement);
 
1604             } catch (Exception e) {
 
1605                 log.warn("Exception thrown handling Device advertisements.", e);
 
1610     private final class DeviceInjectedEventListener
 
1611             implements ClusterMessageHandler {
 
1613         public void handle(ClusterMessage message) {
 
1614             log.debug("Received injected device event from peer: {}", message.sender());
 
1615             DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
 
1617             ProviderId providerId = event.providerId();
 
1618             DeviceId deviceId = event.deviceId();
 
1619             DeviceDescription deviceDescription = event.deviceDescription();
 
1620             if (!deviceClockService.isTimestampAvailable(deviceId)) {
 
1621                 // workaround for ONOS-1208
 
1622                 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
 
1627                 createOrUpdateDevice(providerId, deviceId, deviceDescription);
 
1628             } catch (Exception e) {
 
1629                 log.warn("Exception thrown handling device injected event.", e);
 
1634     private final class PortInjectedEventListener
 
1635             implements ClusterMessageHandler {
 
1637         public void handle(ClusterMessage message) {
 
1638             log.debug("Received injected port event from peer: {}", message.sender());
 
1639             PortInjectedEvent event = SERIALIZER.decode(message.payload());
 
1641             ProviderId providerId = event.providerId();
 
1642             DeviceId deviceId = event.deviceId();
 
1643             List<PortDescription> portDescriptions = event.portDescriptions();
 
1644             if (!deviceClockService.isTimestampAvailable(deviceId)) {
 
1645                 // workaround for ONOS-1208
 
1646                 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
 
1651                 updatePorts(providerId, deviceId, portDescriptions);
 
1652             } catch (Exception e) {
 
1653                 log.warn("Exception thrown handling port injected event.", e);
 
1658     private class InternalPortStatsListener
 
1659             implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
 
1661         public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
 
1662             if (event.type() == PUT) {
 
1663                 Device device = devices.get(event.key());
 
1664                 if (device != null) {
 
1665                     delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));