687762e050782f748bed18aa99620114ec151bb1
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.device.impl;
17
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
22
23 import org.apache.commons.lang3.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onlab.packet.ChassisId;
31 import org.onlab.util.KryoNamespace;
32 import org.onlab.util.NewConcurrentHashMap;
33 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.mastership.MastershipTerm;
38 import org.onosproject.mastership.MastershipTermService;
39 import org.onosproject.net.Annotations;
40 import org.onosproject.net.AnnotationsUtil;
41 import org.onosproject.net.DefaultAnnotations;
42 import org.onosproject.net.DefaultDevice;
43 import org.onosproject.net.DefaultPort;
44 import org.onosproject.net.Device;
45 import org.onosproject.net.Device.Type;
46 import org.onosproject.net.DeviceId;
47 import org.onosproject.net.MastershipRole;
48 import org.onosproject.net.OchPort;
49 import org.onosproject.net.OduCltPort;
50 import org.onosproject.net.OmsPort;
51 import org.onosproject.net.Port;
52 import org.onosproject.net.PortNumber;
53 import org.onosproject.net.device.DefaultPortStatistics;
54 import org.onosproject.net.device.DeviceClockService;
55 import org.onosproject.net.device.DeviceDescription;
56 import org.onosproject.net.device.DeviceEvent;
57 import org.onosproject.net.device.DeviceStore;
58 import org.onosproject.net.device.DeviceStoreDelegate;
59 import org.onosproject.net.device.OchPortDescription;
60 import org.onosproject.net.device.OduCltPortDescription;
61 import org.onosproject.net.device.OmsPortDescription;
62 import org.onosproject.net.device.PortDescription;
63 import org.onosproject.net.device.PortStatistics;
64 import org.onosproject.net.provider.ProviderId;
65 import org.onosproject.store.AbstractStore;
66 import org.onosproject.store.Timestamp;
67 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
68 import org.onosproject.store.cluster.messaging.ClusterMessage;
69 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
70 import org.onosproject.store.cluster.messaging.MessageSubject;
71 import org.onosproject.store.impl.Timestamped;
72 import org.onosproject.store.serializers.KryoNamespaces;
73 import org.onosproject.store.serializers.KryoSerializer;
74 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 import org.onosproject.store.service.EventuallyConsistentMap;
76 import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 import org.onosproject.store.service.EventuallyConsistentMapListener;
78 import org.onosproject.store.service.MultiValuedTimestamp;
79 import org.onosproject.store.service.StorageService;
80 import org.onosproject.store.service.WallClockTimestamp;
81 import org.slf4j.Logger;
82
83 import java.io.IOException;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.List;
91 import java.util.Map;
92 import java.util.Map.Entry;
93 import java.util.Objects;
94 import java.util.Set;
95 import java.util.concurrent.ConcurrentMap;
96 import java.util.concurrent.ExecutorService;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledExecutorService;
99 import java.util.concurrent.TimeUnit;
100
101 import static com.google.common.base.Preconditions.checkArgument;
102 import static com.google.common.base.Predicates.notNull;
103 import static com.google.common.base.Verify.verify;
104 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
105 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
106 import static org.onlab.util.Tools.groupedThreads;
107 import static org.onlab.util.Tools.minPriority;
108 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
109 import static org.onosproject.net.DefaultAnnotations.merge;
110 import static org.onosproject.net.device.DeviceEvent.Type.*;
111 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
112 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
113 import static org.slf4j.LoggerFactory.getLogger;
114
115 /**
116  * Manages inventory of infrastructure devices using gossip protocol to distribute
117  * information.
118  */
119 @Component(immediate = true)
120 @Service
121 public class GossipDeviceStore
122         extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
123         implements DeviceStore {
124
125     private final Logger log = getLogger(getClass());
126
127     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
128     // Timeout in milliseconds to process device or ports on remote master node
129     private static final int REMOTE_MASTER_TIMEOUT = 1000;
130
131     // innerMap is used to lock a Device, thus instance should never be replaced.
132     // collection of Description given from various providers
133     private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
134             deviceDescs = Maps.newConcurrentMap();
135
136     // cache of Device and Ports generated by compositing descriptions from providers
137     private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
138     private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
139
140     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
141     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
142     private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
143             portStatsListener = new InternalPortStatsListener();
144
145     // to be updated under Device lock
146     private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
147     private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
148
149     // available(=UP) devices
150     private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
151
152     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
153     protected DeviceClockService deviceClockService;
154
155     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
156     protected StorageService storageService;
157
158     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
159     protected ClusterCommunicationService clusterCommunicator;
160
161     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
162     protected ClusterService clusterService;
163
164     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
165     protected MastershipService mastershipService;
166
167     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168     protected MastershipTermService termService;
169
170
171     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
172         @Override
173         protected void setupKryoPool() {
174             serializerPool = KryoNamespace.newBuilder()
175                     .register(DistributedStoreSerializers.STORE_COMMON)
176                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
177                     .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
178                     .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
179                     .register(InternalDeviceRemovedEvent.class)
180                     .register(new InternalPortEventSerializer(), InternalPortEvent.class)
181                     .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
182                     .register(DeviceAntiEntropyAdvertisement.class)
183                     .register(DeviceFragmentId.class)
184                     .register(PortFragmentId.class)
185                     .register(DeviceInjectedEvent.class)
186                     .register(PortInjectedEvent.class)
187                     .build();
188         }
189     };
190
191     private ExecutorService executor;
192
193     private ScheduledExecutorService backgroundExecutor;
194
195     // TODO make these anti-entropy parameters configurable
196     private long initialDelaySec = 5;
197     private long periodSec = 5;
198
199     @Activate
200     public void activate() {
201         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
202
203         backgroundExecutor =
204                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
205
206         clusterCommunicator.addSubscriber(
207                 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
208         clusterCommunicator.addSubscriber(
209                 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
210                 new InternalDeviceOfflineEventListener(),
211                 executor);
212         clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
213                                           new InternalRemoveRequestListener(),
214                                           executor);
215         clusterCommunicator.addSubscriber(
216                 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
217         clusterCommunicator.addSubscriber(
218                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
219         clusterCommunicator.addSubscriber(
220                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
221         clusterCommunicator.addSubscriber(
222                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
223                 new InternalDeviceAdvertisementListener(),
224                 backgroundExecutor);
225         clusterCommunicator.addSubscriber(
226                 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
227         clusterCommunicator.addSubscriber(
228                 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
229
230         // start anti-entropy thread
231         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
232                                                initialDelaySec, periodSec, TimeUnit.SECONDS);
233
234         // Create a distributed map for port stats.
235         KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
236                 .register(KryoNamespaces.API)
237                 .register(DefaultPortStatistics.class)
238                 .register(DeviceId.class)
239                 .register(MultiValuedTimestamp.class)
240                 .register(WallClockTimestamp.class);
241
242         devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
243                 .withName("port-stats")
244                 .withSerializer(deviceDataSerializer)
245                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247                 .withTombstonesDisabled()
248                 .build();
249         devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
250                 eventuallyConsistentMapBuilder()
251                 .withName("port-stats-delta")
252                 .withSerializer(deviceDataSerializer)
253                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
254                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
255                 .withTombstonesDisabled()
256                 .build();
257         devicePortStats.addListener(portStatsListener);
258         log.info("Started");
259     }
260
261     @Deactivate
262     public void deactivate() {
263         devicePortStats.destroy();
264         devicePortDeltaStats.destroy();
265         executor.shutdownNow();
266
267         backgroundExecutor.shutdownNow();
268         try {
269             if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
270                 log.error("Timeout during executor shutdown");
271             }
272         } catch (InterruptedException e) {
273             log.error("Error during executor shutdown", e);
274         }
275
276         deviceDescs.clear();
277         devices.clear();
278         devicePorts.clear();
279         availableDevices.clear();
280         log.info("Stopped");
281     }
282
283     @Override
284     public int getDeviceCount() {
285         return devices.size();
286     }
287
288     @Override
289     public Iterable<Device> getDevices() {
290         return Collections.unmodifiableCollection(devices.values());
291     }
292
293     @Override
294     public Iterable<Device> getAvailableDevices() {
295         return FluentIterable.from(getDevices())
296                 .filter(input -> isAvailable(input.id()));
297     }
298
299     @Override
300     public Device getDevice(DeviceId deviceId) {
301         return devices.get(deviceId);
302     }
303
304     @Override
305     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
306                                                          DeviceId deviceId,
307                                                          DeviceDescription deviceDescription) {
308         NodeId localNode = clusterService.getLocalNode().id();
309         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
310
311         // Process device update only if we're the master,
312         // otherwise signal the actual master.
313         DeviceEvent deviceEvent = null;
314         if (localNode.equals(deviceNode)) {
315
316             final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
317             final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
318             final Timestamped<DeviceDescription> mergedDesc;
319             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
320
321             synchronized (device) {
322                 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
323                 mergedDesc = device.get(providerId).getDeviceDesc();
324             }
325
326             if (deviceEvent != null) {
327                 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
328                          providerId, deviceId);
329                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
330             }
331
332         } else {
333             // FIXME Temporary hack for NPE (ONOS-1171).
334             // Proper fix is to implement forwarding to master on ConfigProvider
335             // redo ONOS-490
336             if (deviceNode == null) {
337                 // silently ignore
338                 return null;
339             }
340
341
342             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
343                     providerId, deviceId, deviceDescription);
344
345             // TODO check unicast return value
346             clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
347             /* error log:
348             log.warn("Failed to process injected device id: {} desc: {} " +
349                             "(cluster messaging failed: {})",
350                     deviceId, deviceDescription, e);
351             */
352         }
353
354         return deviceEvent;
355     }
356
357     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
358                                                      DeviceId deviceId,
359                                                      Timestamped<DeviceDescription> deltaDesc) {
360
361         // Collection of DeviceDescriptions for a Device
362         Map<ProviderId, DeviceDescriptions> device
363                 = getOrCreateDeviceDescriptionsMap(deviceId);
364
365         synchronized (device) {
366             // locking per device
367
368             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
369                 log.debug("Ignoring outdated event: {}", deltaDesc);
370                 return null;
371             }
372
373             DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
374
375             final Device oldDevice = devices.get(deviceId);
376             final Device newDevice;
377
378             if (deltaDesc == descs.getDeviceDesc() ||
379                     deltaDesc.isNewer(descs.getDeviceDesc())) {
380                 // on new device or valid update
381                 descs.putDeviceDesc(deltaDesc);
382                 newDevice = composeDevice(deviceId, device);
383             } else {
384                 // outdated event, ignored.
385                 return null;
386             }
387             if (oldDevice == null) {
388                 // ADD
389                 return createDevice(providerId, newDevice, deltaDesc.timestamp());
390             } else {
391                 // UPDATE or ignore (no change or stale)
392                 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
393             }
394         }
395     }
396
397     // Creates the device and returns the appropriate event if necessary.
398     // Guarded by deviceDescs value (=Device lock)
399     private DeviceEvent createDevice(ProviderId providerId,
400                                      Device newDevice, Timestamp timestamp) {
401
402         // update composed device cache
403         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
404         verify(oldDevice == null,
405                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
406                providerId, oldDevice, newDevice);
407
408         if (!providerId.isAncillary()) {
409             markOnline(newDevice.id(), timestamp);
410         }
411
412         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
413     }
414
415     // Updates the device and returns the appropriate event if necessary.
416     // Guarded by deviceDescs value (=Device lock)
417     private DeviceEvent updateDevice(ProviderId providerId,
418                                      Device oldDevice,
419                                      Device newDevice, Timestamp newTimestamp) {
420         // We allow only certain attributes to trigger update
421         boolean propertiesChanged =
422                 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
423                         !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
424                         !Objects.equals(oldDevice.providerId(), newDevice.providerId());
425         boolean annotationsChanged =
426                 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
427
428         // Primary providers can respond to all changes, but ancillary ones
429         // should respond only to annotation changes.
430         DeviceEvent event = null;
431         if ((providerId.isAncillary() && annotationsChanged) ||
432                 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
433             boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
434             if (!replaced) {
435                 verify(replaced,
436                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
437                        providerId, oldDevice, devices.get(newDevice.id())
438                         , newDevice);
439             }
440
441             event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
442         }
443
444         if (!providerId.isAncillary()) {
445             boolean wasOnline = availableDevices.contains(newDevice.id());
446             markOnline(newDevice.id(), newTimestamp);
447             if (!wasOnline) {
448                 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
449             }
450         }
451         return event;
452     }
453
454     @Override
455     public DeviceEvent markOffline(DeviceId deviceId) {
456         final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
457         final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
458         if (event != null) {
459             log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
460                      deviceId, timestamp);
461             notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
462         }
463         return event;
464     }
465
466     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
467
468         Map<ProviderId, DeviceDescriptions> providerDescs
469                 = getOrCreateDeviceDescriptionsMap(deviceId);
470
471         // locking device
472         synchronized (providerDescs) {
473
474             // accept off-line if given timestamp is newer than
475             // the latest Timestamp from Primary provider
476             DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
477             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
478             if (timestamp.compareTo(lastTimestamp) <= 0) {
479                 // outdated event ignore
480                 return null;
481             }
482
483             offline.put(deviceId, timestamp);
484
485             Device device = devices.get(deviceId);
486             if (device == null) {
487                 return null;
488             }
489             boolean removed = availableDevices.remove(deviceId);
490             if (removed) {
491                 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
492             }
493             return null;
494         }
495     }
496
497     /**
498      * Marks the device as available if the given timestamp is not outdated,
499      * compared to the time the device has been marked offline.
500      *
501      * @param deviceId  identifier of the device
502      * @param timestamp of the event triggering this change.
503      * @return true if availability change request was accepted and changed the state
504      */
505     // Guarded by deviceDescs value (=Device lock)
506     private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
507         // accept on-line if given timestamp is newer than
508         // the latest offline request Timestamp
509         Timestamp offlineTimestamp = offline.get(deviceId);
510         if (offlineTimestamp == null ||
511                 offlineTimestamp.compareTo(timestamp) < 0) {
512
513             offline.remove(deviceId);
514             return availableDevices.add(deviceId);
515         }
516         return false;
517     }
518
519     @Override
520     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
521                                                       DeviceId deviceId,
522                                                       List<PortDescription> portDescriptions) {
523
524         NodeId localNode = clusterService.getLocalNode().id();
525         // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
526         // since it will trigger distributed store read.
527         // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
528         // outside Device subsystem. so that we don't have to modify both Device and Link stores.
529         // If we don't care much about topology performance, then it might be OK.
530         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
531
532         // Process port update only if we're the master of the device,
533         // otherwise signal the actual master.
534         List<DeviceEvent> deviceEvents = null;
535         if (localNode.equals(deviceNode)) {
536
537             final Timestamp newTimestamp;
538             try {
539                 newTimestamp = deviceClockService.getTimestamp(deviceId);
540             } catch (IllegalStateException e) {
541                 log.info("Timestamp was not available for device {}", deviceId);
542                 log.debug("  discarding {}", portDescriptions);
543                 // Failed to generate timestamp.
544
545                 // Possible situation:
546                 //  Device connected and became master for short period of time,
547                 // but lost mastership before this instance had the chance to
548                 // retrieve term information.
549
550                 // Information dropped here is expected to be recoverable by
551                 // device probing after mastership change
552
553                 return Collections.emptyList();
554             }
555             log.debug("timestamp for {} {}", deviceId, newTimestamp);
556
557             final Timestamped<List<PortDescription>> timestampedInput
558                     = new Timestamped<>(portDescriptions, newTimestamp);
559             final Timestamped<List<PortDescription>> merged;
560
561             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
562
563             synchronized (device) {
564                 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
565                 final DeviceDescriptions descs = device.get(providerId);
566                 List<PortDescription> mergedList =
567                         FluentIterable.from(portDescriptions)
568                                 .transform(input ->
569                                     // lookup merged port description
570                                     descs.getPortDesc(input.portNumber()).value()
571                                 ).toList();
572                 merged = new Timestamped<>(mergedList, newTimestamp);
573             }
574
575             if (!deviceEvents.isEmpty()) {
576                 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
577                          providerId, deviceId);
578                 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
579             }
580
581         } else {
582             // FIXME Temporary hack for NPE (ONOS-1171).
583             // Proper fix is to implement forwarding to master on ConfigProvider
584             // redo ONOS-490
585             if (deviceNode == null) {
586                 // silently ignore
587                 return Collections.emptyList();
588             }
589
590             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
591
592             //TODO check unicast return value
593             clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
594             /* error log:
595             log.warn("Failed to process injected ports of device id: {} " +
596                             "(cluster messaging failed: {})",
597                     deviceId, e);
598             */
599         }
600
601         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
602     }
603
604     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
605                                                   DeviceId deviceId,
606                                                   Timestamped<List<PortDescription>> portDescriptions) {
607
608         Device device = devices.get(deviceId);
609         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
610
611         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
612         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
613
614         List<DeviceEvent> events = new ArrayList<>();
615         synchronized (descsMap) {
616
617             if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
618                 log.debug("Ignoring outdated events: {}", portDescriptions);
619                 return Collections.emptyList();
620             }
621
622             DeviceDescriptions descs = descsMap.get(providerId);
623             // every provider must provide DeviceDescription.
624             checkArgument(descs != null,
625                           "Device description for Device ID %s from Provider %s was not found",
626                           deviceId, providerId);
627
628             Map<PortNumber, Port> ports = getPortMap(deviceId);
629
630             final Timestamp newTimestamp = portDescriptions.timestamp();
631
632             // Add new ports
633             Set<PortNumber> processed = new HashSet<>();
634             for (PortDescription portDescription : portDescriptions.value()) {
635                 final PortNumber number = portDescription.portNumber();
636                 processed.add(number);
637
638                 final Port oldPort = ports.get(number);
639                 final Port newPort;
640
641
642                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
643                 if (existingPortDesc == null ||
644                         newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
645                     // on new port or valid update
646                     // update description
647                     descs.putPortDesc(new Timestamped<>(portDescription,
648                                                         portDescriptions.timestamp()));
649                     newPort = composePort(device, number, descsMap);
650                 } else {
651                     // outdated event, ignored.
652                     continue;
653                 }
654
655                 events.add(oldPort == null ?
656                                    createPort(device, newPort, ports) :
657                                    updatePort(device, oldPort, newPort, ports));
658             }
659
660             events.addAll(pruneOldPorts(device, ports, processed));
661         }
662         return FluentIterable.from(events).filter(notNull()).toList();
663     }
664
665     // Creates a new port based on the port description adds it to the map and
666     // Returns corresponding event.
667     // Guarded by deviceDescs value (=Device lock)
668     private DeviceEvent createPort(Device device, Port newPort,
669                                    Map<PortNumber, Port> ports) {
670         ports.put(newPort.number(), newPort);
671         return new DeviceEvent(PORT_ADDED, device, newPort);
672     }
673
674     // Checks if the specified port requires update and if so, it replaces the
675     // existing entry in the map and returns corresponding event.
676     // Guarded by deviceDescs value (=Device lock)
677     private DeviceEvent updatePort(Device device, Port oldPort,
678                                    Port newPort,
679                                    Map<PortNumber, Port> ports) {
680         if (oldPort.isEnabled() != newPort.isEnabled() ||
681                 oldPort.type() != newPort.type() ||
682                 oldPort.portSpeed() != newPort.portSpeed() ||
683                 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
684             ports.put(oldPort.number(), newPort);
685             return new DeviceEvent(PORT_UPDATED, device, newPort);
686         }
687         return null;
688     }
689
690     // Prunes the specified list of ports based on which ports are in the
691     // processed list and returns list of corresponding events.
692     // Guarded by deviceDescs value (=Device lock)
693     private List<DeviceEvent> pruneOldPorts(Device device,
694                                             Map<PortNumber, Port> ports,
695                                             Set<PortNumber> processed) {
696         List<DeviceEvent> events = new ArrayList<>();
697         Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
698         while (iterator.hasNext()) {
699             Entry<PortNumber, Port> e = iterator.next();
700             PortNumber portNumber = e.getKey();
701             if (!processed.contains(portNumber)) {
702                 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
703                 iterator.remove();
704             }
705         }
706         return events;
707     }
708
709     // Gets the map of ports for the specified device; if one does not already
710     // exist, it creates and registers a new one.
711     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
712         return createIfAbsentUnchecked(devicePorts, deviceId,
713                                        NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
714     }
715
716     private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
717             DeviceId deviceId) {
718         Map<ProviderId, DeviceDescriptions> r;
719         r = deviceDescs.get(deviceId);
720         if (r == null) {
721             r = new HashMap<>();
722             final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
723             concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
724             if (concurrentlyAdded != null) {
725                 r = concurrentlyAdded;
726             }
727         }
728         return r;
729     }
730
731     // Guarded by deviceDescs value (=Device lock)
732     private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
733             Map<ProviderId, DeviceDescriptions> device,
734             ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
735         synchronized (device) {
736             DeviceDescriptions r = device.get(providerId);
737             if (r == null) {
738                 r = new DeviceDescriptions(deltaDesc);
739                 device.put(providerId, r);
740             }
741             return r;
742         }
743     }
744
745     @Override
746     public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
747                                                      DeviceId deviceId,
748                                                      PortDescription portDescription) {
749         final Timestamp newTimestamp;
750         try {
751             newTimestamp = deviceClockService.getTimestamp(deviceId);
752         } catch (IllegalStateException e) {
753             log.info("Timestamp was not available for device {}", deviceId);
754             log.debug("  discarding {}", portDescription);
755             // Failed to generate timestamp. Ignoring.
756             // See updatePorts comment
757             return null;
758         }
759         final Timestamped<PortDescription> deltaDesc
760                 = new Timestamped<>(portDescription, newTimestamp);
761         final DeviceEvent event;
762         final Timestamped<PortDescription> mergedDesc;
763         final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
764         synchronized (device) {
765             event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
766             mergedDesc = device.get(providerId)
767                     .getPortDesc(portDescription.portNumber());
768         }
769         if (event != null) {
770             log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
771                      providerId, deviceId);
772             notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
773         }
774         return event;
775     }
776
777     private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
778                                                  Timestamped<PortDescription> deltaDesc) {
779         Device device = devices.get(deviceId);
780         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
781
782         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
783         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
784
785         synchronized (descsMap) {
786
787             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
788                 log.debug("Ignoring outdated event: {}", deltaDesc);
789                 return null;
790             }
791
792             DeviceDescriptions descs = descsMap.get(providerId);
793             // assuming all providers must to give DeviceDescription
794             verify(descs != null,
795                    "Device description for Device ID %s from Provider %s was not found",
796                    deviceId, providerId);
797
798             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
799             final PortNumber number = deltaDesc.value().portNumber();
800             final Port oldPort = ports.get(number);
801             final Port newPort;
802
803             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
804             if (existingPortDesc == null ||
805                     deltaDesc.isNewer(existingPortDesc)) {
806                 // on new port or valid update
807                 // update description
808                 descs.putPortDesc(deltaDesc);
809                 newPort = composePort(device, number, descsMap);
810             } else {
811                 // same or outdated event, ignored.
812                 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
813                 return null;
814             }
815
816             if (oldPort == null) {
817                 return createPort(device, newPort, ports);
818             } else {
819                 return updatePort(device, oldPort, newPort, ports);
820             }
821         }
822     }
823
824     @Override
825     public List<Port> getPorts(DeviceId deviceId) {
826         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
827         if (ports == null) {
828             return Collections.emptyList();
829         }
830         return ImmutableList.copyOf(ports.values());
831     }
832
833     @Override
834     public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
835                                             Collection<PortStatistics> newStatsCollection) {
836
837         Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
838         Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
839         Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
840
841         if (prvStatsMap != null) {
842             for (PortStatistics newStats : newStatsCollection) {
843                 PortNumber port = PortNumber.portNumber(newStats.port());
844                 PortStatistics prvStats = prvStatsMap.get(port);
845                 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
846                 PortStatistics deltaStats = builder.build();
847                 if (prvStats != null) {
848                     deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
849                 }
850                 deltaStatsMap.put(port, deltaStats);
851                 newStatsMap.put(port, newStats);
852             }
853         } else {
854             for (PortStatistics newStats : newStatsCollection) {
855                 PortNumber port = PortNumber.portNumber(newStats.port());
856                 newStatsMap.put(port, newStats);
857             }
858         }
859         devicePortDeltaStats.put(deviceId, deltaStatsMap);
860         devicePortStats.put(deviceId, newStatsMap);
861         // DeviceEvent returns null because of InternalPortStatsListener usage
862         return null;
863     }
864
865     /**
866      * Calculate delta statistics by subtracting previous from new statistics.
867      *
868      * @param deviceId device identifier
869      * @param prvStats previous port statistics
870      * @param newStats new port statistics
871      * @return PortStatistics
872      */
873     public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
874         // calculate time difference
875         long deltaStatsSec, deltaStatsNano;
876         if (newStats.durationNano() < prvStats.durationNano()) {
877             deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
878             deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
879         } else {
880             deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
881             deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
882         }
883         DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
884         DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
885                 .setPort(newStats.port())
886                 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
887                 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
888                 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
889                 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
890                 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
891                 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
892                 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
893                 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
894                 .setDurationSec(deltaStatsSec)
895                 .setDurationNano(deltaStatsNano)
896                 .build();
897         return deltaStats;
898     }
899
900     @Override
901     public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
902         Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
903         if (portStats == null) {
904             return Collections.emptyList();
905         }
906         return ImmutableList.copyOf(portStats.values());
907     }
908
909     @Override
910     public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
911         Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
912         if (portStats == null) {
913             return Collections.emptyList();
914         }
915         return ImmutableList.copyOf(portStats.values());
916     }
917
918     @Override
919     public Port getPort(DeviceId deviceId, PortNumber portNumber) {
920         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
921         return ports == null ? null : ports.get(portNumber);
922     }
923
924     @Override
925     public boolean isAvailable(DeviceId deviceId) {
926         return availableDevices.contains(deviceId);
927     }
928
929     @Override
930     public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
931         final NodeId myId = clusterService.getLocalNode().id();
932         NodeId master = mastershipService.getMasterFor(deviceId);
933
934         // if there exist a master, forward
935         // if there is no master, try to become one and process
936
937         boolean relinquishAtEnd = false;
938         if (master == null) {
939             final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
940             if (myRole != MastershipRole.NONE) {
941                 relinquishAtEnd = true;
942             }
943             log.debug("Temporarily requesting role for {} to remove", deviceId);
944             mastershipService.requestRoleFor(deviceId);
945             MastershipTerm term = termService.getMastershipTerm(deviceId);
946             if (term != null && myId.equals(term.master())) {
947                 master = myId;
948             }
949         }
950
951         if (!myId.equals(master)) {
952             log.debug("{} has control of {}, forwarding remove request",
953                       master, deviceId);
954
955             // TODO check unicast return value
956             clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
957              /* error log:
958              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
959              */
960
961             // event will be triggered after master processes it.
962             return null;
963         }
964
965         // I have control..
966
967         Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
968         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
969         if (event != null) {
970             log.debug("Notifying peers of a device removed topology event for deviceId: {}",
971                       deviceId);
972             notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
973         }
974         if (relinquishAtEnd) {
975             log.debug("Relinquishing temporary role acquired for {}", deviceId);
976             mastershipService.relinquishMastership(deviceId);
977         }
978         return event;
979     }
980
981     private DeviceEvent removeDeviceInternal(DeviceId deviceId,
982                                              Timestamp timestamp) {
983
984         Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
985         synchronized (descs) {
986             // accept removal request if given timestamp is newer than
987             // the latest Timestamp from Primary provider
988             DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
989             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
990             if (timestamp.compareTo(lastTimestamp) <= 0) {
991                 // outdated event ignore
992                 return null;
993             }
994             removalRequest.put(deviceId, timestamp);
995
996             Device device = devices.remove(deviceId);
997             // should DEVICE_REMOVED carry removed ports?
998             Map<PortNumber, Port> ports = devicePorts.get(deviceId);
999             if (ports != null) {
1000                 ports.clear();
1001             }
1002             markOfflineInternal(deviceId, timestamp);
1003             descs.clear();
1004             return device == null ? null :
1005                     new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1006         }
1007     }
1008
1009     /**
1010      * Checks if given timestamp is superseded by removal request
1011      * with more recent timestamp.
1012      *
1013      * @param deviceId         identifier of a device
1014      * @param timestampToCheck timestamp of an event to check
1015      * @return true if device is already removed
1016      */
1017     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1018         Timestamp removalTimestamp = removalRequest.get(deviceId);
1019         if (removalTimestamp != null &&
1020                 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1021             // removalRequest is more recent
1022             return true;
1023         }
1024         return false;
1025     }
1026
1027     /**
1028      * Returns a Device, merging description given from multiple Providers.
1029      *
1030      * @param deviceId      device identifier
1031      * @param providerDescs Collection of Descriptions from multiple providers
1032      * @return Device instance
1033      */
1034     private Device composeDevice(DeviceId deviceId,
1035                                  Map<ProviderId, DeviceDescriptions> providerDescs) {
1036
1037         checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1038
1039         ProviderId primary = pickPrimaryPID(providerDescs);
1040
1041         DeviceDescriptions desc = providerDescs.get(primary);
1042
1043         final DeviceDescription base = desc.getDeviceDesc().value();
1044         Type type = base.type();
1045         String manufacturer = base.manufacturer();
1046         String hwVersion = base.hwVersion();
1047         String swVersion = base.swVersion();
1048         String serialNumber = base.serialNumber();
1049         ChassisId chassisId = base.chassisId();
1050         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1051         annotations = merge(annotations, base.annotations());
1052
1053         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1054             if (e.getKey().equals(primary)) {
1055                 continue;
1056             }
1057             // Note: should keep track of Description timestamp in the future
1058             // and only merge conflicting keys when timestamp is newer.
1059             // Currently assuming there will never be a key conflict between
1060             // providers
1061
1062             // annotation merging. not so efficient, should revisit later
1063             annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1064         }
1065
1066         return new DefaultDevice(primary, deviceId, type, manufacturer,
1067                                  hwVersion, swVersion, serialNumber,
1068                                  chassisId, annotations);
1069     }
1070
1071     private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1072                                  PortDescription description, Annotations annotations) {
1073         switch (description.type()) {
1074             case OMS:
1075                 OmsPortDescription omsDesc = (OmsPortDescription) description;
1076                 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1077                         omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1078             case OCH:
1079                 OchPortDescription ochDesc = (OchPortDescription) description;
1080                 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1081                         ochDesc.isTunable(), ochDesc.lambda(), annotations);
1082             case ODUCLT:
1083                 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1084                 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1085             default:
1086                 return new DefaultPort(device, number, isEnabled, description.type(),
1087                         description.portSpeed(), annotations);
1088         }
1089     }
1090
1091     /**
1092      * Returns a Port, merging description given from multiple Providers.
1093      *
1094      * @param device   device the port is on
1095      * @param number   port number
1096      * @param descsMap Collection of Descriptions from multiple providers
1097      * @return Port instance
1098      */
1099     private Port composePort(Device device, PortNumber number,
1100                              Map<ProviderId, DeviceDescriptions> descsMap) {
1101
1102         ProviderId primary = pickPrimaryPID(descsMap);
1103         DeviceDescriptions primDescs = descsMap.get(primary);
1104         // if no primary, assume not enabled
1105         boolean isEnabled = false;
1106         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1107         Timestamp newest = null;
1108         final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1109         if (portDesc != null) {
1110             isEnabled = portDesc.value().isEnabled();
1111             annotations = merge(annotations, portDesc.value().annotations());
1112             newest = portDesc.timestamp();
1113         }
1114         Port updated = null;
1115         for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1116             if (e.getKey().equals(primary)) {
1117                 continue;
1118             }
1119             // Note: should keep track of Description timestamp in the future
1120             // and only merge conflicting keys when timestamp is newer.
1121             // Currently assuming there will never be a key conflict between
1122             // providers
1123
1124             // annotation merging. not so efficient, should revisit later
1125             final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1126             if (otherPortDesc != null) {
1127                 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1128                     continue;
1129                 }
1130                 annotations = merge(annotations, otherPortDesc.value().annotations());
1131                 PortDescription other = otherPortDesc.value();
1132                 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1133                 newest = otherPortDesc.timestamp();
1134             }
1135         }
1136         if (portDesc == null) {
1137             return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1138         }
1139         PortDescription current = portDesc.value();
1140         return updated == null
1141                 ? buildTypedPort(device, number, isEnabled, current, annotations)
1142                 : updated;
1143     }
1144
1145     /**
1146      * @return primary ProviderID, or randomly chosen one if none exists
1147      */
1148     private ProviderId pickPrimaryPID(
1149             Map<ProviderId, DeviceDescriptions> providerDescs) {
1150         ProviderId fallBackPrimary = null;
1151         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1152             if (!e.getKey().isAncillary()) {
1153                 return e.getKey();
1154             } else if (fallBackPrimary == null) {
1155                 // pick randomly as a fallback in case there is no primary
1156                 fallBackPrimary = e.getKey();
1157             }
1158         }
1159         return fallBackPrimary;
1160     }
1161
1162     private DeviceDescriptions getPrimaryDescriptions(
1163             Map<ProviderId, DeviceDescriptions> providerDescs) {
1164         ProviderId pid = pickPrimaryPID(providerDescs);
1165         return providerDescs.get(pid);
1166     }
1167
1168     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1169         clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1170     }
1171
1172     private void broadcastMessage(MessageSubject subject, Object event) {
1173         clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1174     }
1175
1176     private void notifyPeers(InternalDeviceEvent event) {
1177         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1178     }
1179
1180     private void notifyPeers(InternalDeviceOfflineEvent event) {
1181         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1182     }
1183
1184     private void notifyPeers(InternalDeviceRemovedEvent event) {
1185         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1186     }
1187
1188     private void notifyPeers(InternalPortEvent event) {
1189         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1190     }
1191
1192     private void notifyPeers(InternalPortStatusEvent event) {
1193         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1194     }
1195
1196     private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1197         try {
1198             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1199         } catch (IOException e) {
1200             log.error("Failed to send" + event + " to " + recipient, e);
1201         }
1202     }
1203
1204     private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1205         try {
1206             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1207         } catch (IOException e) {
1208             log.error("Failed to send" + event + " to " + recipient, e);
1209         }
1210     }
1211
1212     private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1213         try {
1214             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1215         } catch (IOException e) {
1216             log.error("Failed to send" + event + " to " + recipient, e);
1217         }
1218     }
1219
1220     private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1221         try {
1222             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1223         } catch (IOException e) {
1224             log.error("Failed to send" + event + " to " + recipient, e);
1225         }
1226     }
1227
1228     private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1229         try {
1230             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1231         } catch (IOException e) {
1232             log.error("Failed to send" + event + " to " + recipient, e);
1233         }
1234     }
1235
1236     private DeviceAntiEntropyAdvertisement createAdvertisement() {
1237         final NodeId self = clusterService.getLocalNode().id();
1238
1239         final int numDevices = deviceDescs.size();
1240         Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1241         final int portsPerDevice = 8; // random factor to minimize reallocation
1242         Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1243         Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1244
1245         deviceDescs.forEach((deviceId, devDescs) -> {
1246
1247             // for each Device...
1248             synchronized (devDescs) {
1249
1250                 // send device offline timestamp
1251                 Timestamp lOffline = this.offline.get(deviceId);
1252                 if (lOffline != null) {
1253                     adOffline.put(deviceId, lOffline);
1254                 }
1255
1256                 for (Entry<ProviderId, DeviceDescriptions>
1257                         prov : devDescs.entrySet()) {
1258
1259                     // for each Provider Descriptions...
1260                     final ProviderId provId = prov.getKey();
1261                     final DeviceDescriptions descs = prov.getValue();
1262
1263                     adDevices.put(new DeviceFragmentId(deviceId, provId),
1264                                   descs.getDeviceDesc().timestamp());
1265
1266                     for (Entry<PortNumber, Timestamped<PortDescription>>
1267                             portDesc : descs.getPortDescs().entrySet()) {
1268
1269                         final PortNumber number = portDesc.getKey();
1270                         adPorts.put(new PortFragmentId(deviceId, provId, number),
1271                                     portDesc.getValue().timestamp());
1272                     }
1273                 }
1274             }
1275         });
1276
1277         return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1278     }
1279
1280     /**
1281      * Responds to anti-entropy advertisement message.
1282      * <p/>
1283      * Notify sender about out-dated information using regular replication message.
1284      * Send back advertisement to sender if not in sync.
1285      *
1286      * @param advertisement to respond to
1287      */
1288     private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1289
1290         final NodeId sender = advertisement.sender();
1291
1292         Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1293         Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1294         Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1295
1296         // Fragments to request
1297         Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1298         Collection<PortFragmentId> reqPorts = new ArrayList<>();
1299
1300         for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1301             final DeviceId deviceId = de.getKey();
1302             final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1303
1304             synchronized (lDevice) {
1305                 // latestTimestamp across provider
1306                 // Note: can be null initially
1307                 Timestamp localLatest = offline.get(deviceId);
1308
1309                 // handle device Ads
1310                 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1311                     final ProviderId provId = prov.getKey();
1312                     final DeviceDescriptions lDeviceDescs = prov.getValue();
1313
1314                     final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1315
1316
1317                     Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1318                     Timestamp advDevTimestamp = devAds.get(devFragId);
1319
1320                     if (advDevTimestamp == null || lProvDevice.isNewerThan(
1321                             advDevTimestamp)) {
1322                         // remote does not have it or outdated, suggest
1323                         notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1324                     } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1325                         // local is outdated, request
1326                         reqDevices.add(devFragId);
1327                     }
1328
1329                     // handle port Ads
1330                     for (Entry<PortNumber, Timestamped<PortDescription>>
1331                             pe : lDeviceDescs.getPortDescs().entrySet()) {
1332
1333                         final PortNumber num = pe.getKey();
1334                         final Timestamped<PortDescription> lPort = pe.getValue();
1335
1336                         final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1337
1338                         Timestamp advPortTimestamp = portAds.get(portFragId);
1339                         if (advPortTimestamp == null || lPort.isNewerThan(
1340                                 advPortTimestamp)) {
1341                             // remote does not have it or outdated, suggest
1342                             notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1343                         } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1344                             // local is outdated, request
1345                             log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1346                             reqPorts.add(portFragId);
1347                         }
1348
1349                         // remove port Ad already processed
1350                         portAds.remove(portFragId);
1351                     } // end local port loop
1352
1353                     // remove device Ad already processed
1354                     devAds.remove(devFragId);
1355
1356                     // find latest and update
1357                     final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1358                     if (localLatest == null ||
1359                             providerLatest.compareTo(localLatest) > 0) {
1360                         localLatest = providerLatest;
1361                     }
1362                 } // end local provider loop
1363
1364                 // checking if remote timestamp is more recent.
1365                 Timestamp rOffline = offlineAds.get(deviceId);
1366                 if (rOffline != null &&
1367                         rOffline.compareTo(localLatest) > 0) {
1368                     // remote offline timestamp suggests that the
1369                     // device is off-line
1370                     markOfflineInternal(deviceId, rOffline);
1371                 }
1372
1373                 Timestamp lOffline = offline.get(deviceId);
1374                 if (lOffline != null && rOffline == null) {
1375                     // locally offline, but remote is online, suggest offline
1376                     notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1377                 }
1378
1379                 // remove device offline Ad already processed
1380                 offlineAds.remove(deviceId);
1381             } // end local device loop
1382         } // device lock
1383
1384         // If there is any Ads left, request them
1385         log.trace("Ads left {}, {}", devAds, portAds);
1386         reqDevices.addAll(devAds.keySet());
1387         reqPorts.addAll(portAds.keySet());
1388
1389         if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1390             log.trace("Nothing to request to remote peer {}", sender);
1391             return;
1392         }
1393
1394         log.debug("Need to sync {} {}", reqDevices, reqPorts);
1395
1396         // 2-way Anti-Entropy for now
1397         try {
1398             unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1399         } catch (IOException e) {
1400             log.error("Failed to send response advertisement to " + sender, e);
1401         }
1402
1403 // Sketch of 3-way Anti-Entropy
1404 //        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1405 //        ClusterMessage message = new ClusterMessage(
1406 //                clusterService.getLocalNode().id(),
1407 //                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1408 //                SERIALIZER.encode(request));
1409 //
1410 //        try {
1411 //            clusterCommunicator.unicast(message, advertisement.sender());
1412 //        } catch (IOException e) {
1413 //            log.error("Failed to send advertisement reply to "
1414 //                      + advertisement.sender(), e);
1415 //        }
1416     }
1417
1418     private void notifyDelegateIfNotNull(DeviceEvent event) {
1419         if (event != null) {
1420             notifyDelegate(event);
1421         }
1422     }
1423
1424     private final class SendAdvertisementTask implements Runnable {
1425
1426         @Override
1427         public void run() {
1428             if (Thread.currentThread().isInterrupted()) {
1429                 log.debug("Interrupted, quitting");
1430                 return;
1431             }
1432
1433             try {
1434                 final NodeId self = clusterService.getLocalNode().id();
1435                 Set<ControllerNode> nodes = clusterService.getNodes();
1436
1437                 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1438                         .transform(toNodeId())
1439                         .toList();
1440
1441                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1442                     log.trace("No other peers in the cluster.");
1443                     return;
1444                 }
1445
1446                 NodeId peer;
1447                 do {
1448                     int idx = RandomUtils.nextInt(0, nodeIds.size());
1449                     peer = nodeIds.get(idx);
1450                 } while (peer.equals(self));
1451
1452                 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1453
1454                 if (Thread.currentThread().isInterrupted()) {
1455                     log.debug("Interrupted, quitting");
1456                     return;
1457                 }
1458
1459                 try {
1460                     unicastMessage(peer, DEVICE_ADVERTISE, ad);
1461                 } catch (IOException e) {
1462                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
1463                     return;
1464                 }
1465             } catch (Exception e) {
1466                 // catch all Exception to avoid Scheduled task being suppressed.
1467                 log.error("Exception thrown while sending advertisement", e);
1468             }
1469         }
1470     }
1471
1472     private final class InternalDeviceEventListener
1473             implements ClusterMessageHandler {
1474         @Override
1475         public void handle(ClusterMessage message) {
1476             log.debug("Received device update event from peer: {}", message.sender());
1477             InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1478
1479             ProviderId providerId = event.providerId();
1480             DeviceId deviceId = event.deviceId();
1481             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1482
1483             try {
1484                 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1485             } catch (Exception e) {
1486                 log.warn("Exception thrown handling device update", e);
1487             }
1488         }
1489     }
1490
1491     private final class InternalDeviceOfflineEventListener
1492             implements ClusterMessageHandler {
1493         @Override
1494         public void handle(ClusterMessage message) {
1495             log.debug("Received device offline event from peer: {}", message.sender());
1496             InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1497
1498             DeviceId deviceId = event.deviceId();
1499             Timestamp timestamp = event.timestamp();
1500
1501             try {
1502                 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1503             } catch (Exception e) {
1504                 log.warn("Exception thrown handling device offline", e);
1505             }
1506         }
1507     }
1508
1509     private final class InternalRemoveRequestListener
1510             implements ClusterMessageHandler {
1511         @Override
1512         public void handle(ClusterMessage message) {
1513             log.debug("Received device remove request from peer: {}", message.sender());
1514             DeviceId did = SERIALIZER.decode(message.payload());
1515
1516             try {
1517                 removeDevice(did);
1518             } catch (Exception e) {
1519                 log.warn("Exception thrown handling device remove", e);
1520             }
1521         }
1522     }
1523
1524     private final class InternalDeviceRemovedEventListener
1525             implements ClusterMessageHandler {
1526         @Override
1527         public void handle(ClusterMessage message) {
1528             log.debug("Received device removed event from peer: {}", message.sender());
1529             InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1530
1531             DeviceId deviceId = event.deviceId();
1532             Timestamp timestamp = event.timestamp();
1533
1534             try {
1535                 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1536             } catch (Exception e) {
1537                 log.warn("Exception thrown handling device removed", e);
1538             }
1539         }
1540     }
1541
1542     private final class InternalPortEventListener
1543             implements ClusterMessageHandler {
1544         @Override
1545         public void handle(ClusterMessage message) {
1546
1547             log.debug("Received port update event from peer: {}", message.sender());
1548             InternalPortEvent event = SERIALIZER.decode(message.payload());
1549
1550             ProviderId providerId = event.providerId();
1551             DeviceId deviceId = event.deviceId();
1552             Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1553
1554             if (getDevice(deviceId) == null) {
1555                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1556                 // Note: dropped information will be recovered by anti-entropy
1557                 return;
1558             }
1559
1560             try {
1561                 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1562             } catch (Exception e) {
1563                 log.warn("Exception thrown handling port update", e);
1564             }
1565         }
1566     }
1567
1568     private final class InternalPortStatusEventListener
1569             implements ClusterMessageHandler {
1570         @Override
1571         public void handle(ClusterMessage message) {
1572
1573             log.debug("Received port status update event from peer: {}", message.sender());
1574             InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1575
1576             ProviderId providerId = event.providerId();
1577             DeviceId deviceId = event.deviceId();
1578             Timestamped<PortDescription> portDescription = event.portDescription();
1579
1580             if (getDevice(deviceId) == null) {
1581                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1582                 // Note: dropped information will be recovered by anti-entropy
1583                 return;
1584             }
1585
1586             try {
1587                 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1588             } catch (Exception e) {
1589                 log.warn("Exception thrown handling port update", e);
1590             }
1591         }
1592     }
1593
1594     private final class InternalDeviceAdvertisementListener
1595             implements ClusterMessageHandler {
1596         @Override
1597         public void handle(ClusterMessage message) {
1598             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1599             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1600             try {
1601                 handleAdvertisement(advertisement);
1602             } catch (Exception e) {
1603                 log.warn("Exception thrown handling Device advertisements.", e);
1604             }
1605         }
1606     }
1607
1608     private final class DeviceInjectedEventListener
1609             implements ClusterMessageHandler {
1610         @Override
1611         public void handle(ClusterMessage message) {
1612             log.debug("Received injected device event from peer: {}", message.sender());
1613             DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1614
1615             ProviderId providerId = event.providerId();
1616             DeviceId deviceId = event.deviceId();
1617             DeviceDescription deviceDescription = event.deviceDescription();
1618             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1619                 // workaround for ONOS-1208
1620                 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1621                 return;
1622             }
1623
1624             try {
1625                 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1626             } catch (Exception e) {
1627                 log.warn("Exception thrown handling device injected event.", e);
1628             }
1629         }
1630     }
1631
1632     private final class PortInjectedEventListener
1633             implements ClusterMessageHandler {
1634         @Override
1635         public void handle(ClusterMessage message) {
1636             log.debug("Received injected port event from peer: {}", message.sender());
1637             PortInjectedEvent event = SERIALIZER.decode(message.payload());
1638
1639             ProviderId providerId = event.providerId();
1640             DeviceId deviceId = event.deviceId();
1641             List<PortDescription> portDescriptions = event.portDescriptions();
1642             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1643                 // workaround for ONOS-1208
1644                 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1645                 return;
1646             }
1647
1648             try {
1649                 updatePorts(providerId, deviceId, portDescriptions);
1650             } catch (Exception e) {
1651                 log.warn("Exception thrown handling port injected event.", e);
1652             }
1653         }
1654     }
1655
1656     private class InternalPortStatsListener
1657             implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1658         @Override
1659         public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1660             if (event.type() == PUT) {
1661                 Device device = devices.get(event.key());
1662                 if (device != null) {
1663                     delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
1664                 }
1665             }
1666         }
1667     }
1668 }