6345643329dfbb610cb6dcf82eb2276a8a611f76
[onosfw.git] /
1 /*
2  * Copyright 2014-2015 Open Networking Laboratory
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 package org.onosproject.store.device.impl;
17
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
23
24 import org.apache.commons.lang3.RandomUtils;
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service;
31 import org.onlab.packet.ChassisId;
32 import org.onlab.util.KryoNamespace;
33 import org.onlab.util.NewConcurrentHashMap;
34 import org.onosproject.cluster.ClusterService;
35 import org.onosproject.cluster.ControllerNode;
36 import org.onosproject.cluster.NodeId;
37 import org.onosproject.mastership.MastershipService;
38 import org.onosproject.mastership.MastershipTerm;
39 import org.onosproject.mastership.MastershipTermService;
40 import org.onosproject.net.Annotations;
41 import org.onosproject.net.AnnotationsUtil;
42 import org.onosproject.net.DefaultAnnotations;
43 import org.onosproject.net.DefaultDevice;
44 import org.onosproject.net.DefaultPort;
45 import org.onosproject.net.Device;
46 import org.onosproject.net.Device.Type;
47 import org.onosproject.net.DeviceId;
48 import org.onosproject.net.MastershipRole;
49 import org.onosproject.net.OchPort;
50 import org.onosproject.net.OduCltPort;
51 import org.onosproject.net.OmsPort;
52 import org.onosproject.net.Port;
53 import org.onosproject.net.PortNumber;
54 import org.onosproject.net.device.DefaultPortStatistics;
55 import org.onosproject.net.device.DeviceClockService;
56 import org.onosproject.net.device.DeviceDescription;
57 import org.onosproject.net.device.DeviceEvent;
58 import org.onosproject.net.device.DeviceStore;
59 import org.onosproject.net.device.DeviceStoreDelegate;
60 import org.onosproject.net.device.OchPortDescription;
61 import org.onosproject.net.device.OduCltPortDescription;
62 import org.onosproject.net.device.OmsPortDescription;
63 import org.onosproject.net.device.PortDescription;
64 import org.onosproject.net.device.PortStatistics;
65 import org.onosproject.net.provider.ProviderId;
66 import org.onosproject.store.AbstractStore;
67 import org.onosproject.store.Timestamp;
68 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
69 import org.onosproject.store.cluster.messaging.ClusterMessage;
70 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
71 import org.onosproject.store.cluster.messaging.MessageSubject;
72 import org.onosproject.store.impl.Timestamped;
73 import org.onosproject.store.serializers.KryoNamespaces;
74 import org.onosproject.store.serializers.KryoSerializer;
75 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
76 import org.onosproject.store.service.EventuallyConsistentMap;
77 import org.onosproject.store.service.EventuallyConsistentMapEvent;
78 import org.onosproject.store.service.EventuallyConsistentMapListener;
79 import org.onosproject.store.service.MultiValuedTimestamp;
80 import org.onosproject.store.service.StorageService;
81 import org.onosproject.store.service.WallClockTimestamp;
82 import org.slf4j.Logger;
83
84 import java.io.IOException;
85 import java.util.ArrayList;
86 import java.util.Collection;
87 import java.util.Collections;
88 import java.util.HashMap;
89 import java.util.HashSet;
90 import java.util.Iterator;
91 import java.util.List;
92 import java.util.Map;
93 import java.util.Map.Entry;
94 import java.util.Objects;
95 import java.util.Set;
96 import java.util.concurrent.ConcurrentMap;
97 import java.util.concurrent.ExecutorService;
98 import java.util.concurrent.Executors;
99 import java.util.concurrent.ScheduledExecutorService;
100 import java.util.concurrent.TimeUnit;
101
102 import static com.google.common.base.Preconditions.checkArgument;
103 import static com.google.common.base.Predicates.notNull;
104 import static com.google.common.base.Verify.verify;
105 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
106 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
107 import static org.onlab.util.Tools.groupedThreads;
108 import static org.onlab.util.Tools.minPriority;
109 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
110 import static org.onosproject.net.DefaultAnnotations.merge;
111 import static org.onosproject.net.device.DeviceEvent.Type.*;
112 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
113 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
114 import static org.slf4j.LoggerFactory.getLogger;
115
116 /**
117  * Manages inventory of infrastructure devices using gossip protocol to distribute
118  * information.
119  */
120 @Component(immediate = true)
121 @Service
122 public class GossipDeviceStore
123         extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
124         implements DeviceStore {
125
126     private final Logger log = getLogger(getClass());
127
128     private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
129     // Timeout in milliseconds to process device or ports on remote master node
130     private static final int REMOTE_MASTER_TIMEOUT = 1000;
131
132     // innerMap is used to lock a Device, thus instance should never be replaced.
133     // collection of Description given from various providers
134     private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
135             deviceDescs = Maps.newConcurrentMap();
136
137     // cache of Device and Ports generated by compositing descriptions from providers
138     private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
139     private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
140
141     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
142     private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
143     private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
144             portStatsListener = new InternalPortStatsListener();
145
146     // to be updated under Device lock
147     private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
148     private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
149
150     // available(=UP) devices
151     private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
152
153     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154     protected DeviceClockService deviceClockService;
155
156     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157     protected StorageService storageService;
158
159     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160     protected ClusterCommunicationService clusterCommunicator;
161
162     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163     protected ClusterService clusterService;
164
165     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166     protected MastershipService mastershipService;
167
168     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169     protected MastershipTermService termService;
170
171
172     protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
173         @Override
174         protected void setupKryoPool() {
175             serializerPool = KryoNamespace.newBuilder()
176                     .register(DistributedStoreSerializers.STORE_COMMON)
177                     .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
178                     .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
179                     .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
180                     .register(InternalDeviceRemovedEvent.class)
181                     .register(new InternalPortEventSerializer(), InternalPortEvent.class)
182                     .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
183                     .register(DeviceAntiEntropyAdvertisement.class)
184                     .register(DeviceFragmentId.class)
185                     .register(PortFragmentId.class)
186                     .register(DeviceInjectedEvent.class)
187                     .register(PortInjectedEvent.class)
188                     .build();
189         }
190     };
191
192     private ExecutorService executor;
193
194     private ScheduledExecutorService backgroundExecutor;
195
196     // TODO make these anti-entropy parameters configurable
197     private long initialDelaySec = 5;
198     private long periodSec = 5;
199
200     @Activate
201     public void activate() {
202         executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
203
204         backgroundExecutor =
205                 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
206
207         clusterCommunicator.addSubscriber(
208                 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
209         clusterCommunicator.addSubscriber(
210                 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
211                 new InternalDeviceOfflineEventListener(),
212                 executor);
213         clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
214                                           new InternalRemoveRequestListener(),
215                                           executor);
216         clusterCommunicator.addSubscriber(
217                 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
218         clusterCommunicator.addSubscriber(
219                 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
220         clusterCommunicator.addSubscriber(
221                 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
222         clusterCommunicator.addSubscriber(
223                 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
224                 new InternalDeviceAdvertisementListener(),
225                 backgroundExecutor);
226         clusterCommunicator.addSubscriber(
227                 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
228         clusterCommunicator.addSubscriber(
229                 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
230
231         // start anti-entropy thread
232         backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
233                                                initialDelaySec, periodSec, TimeUnit.SECONDS);
234
235         // Create a distributed map for port stats.
236         KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
237                 .register(KryoNamespaces.API)
238                 .register(DefaultPortStatistics.class)
239                 .register(DeviceId.class)
240                 .register(MultiValuedTimestamp.class)
241                 .register(WallClockTimestamp.class);
242
243         devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
244                 .withName("port-stats")
245                 .withSerializer(deviceDataSerializer)
246                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
247                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
248                 .withTombstonesDisabled()
249                 .build();
250         devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
251                 eventuallyConsistentMapBuilder()
252                 .withName("port-stats-delta")
253                 .withSerializer(deviceDataSerializer)
254                 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
255                 .withTimestampProvider((k, v) -> new WallClockTimestamp())
256                 .withTombstonesDisabled()
257                 .build();
258         devicePortStats.addListener(portStatsListener);
259         log.info("Started");
260     }
261
262     @Deactivate
263     public void deactivate() {
264         devicePortStats.destroy();
265         devicePortDeltaStats.destroy();
266         executor.shutdownNow();
267
268         backgroundExecutor.shutdownNow();
269         try {
270             if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
271                 log.error("Timeout during executor shutdown");
272             }
273         } catch (InterruptedException e) {
274             log.error("Error during executor shutdown", e);
275         }
276
277         deviceDescs.clear();
278         devices.clear();
279         devicePorts.clear();
280         availableDevices.clear();
281         log.info("Stopped");
282     }
283
284     @Override
285     public int getDeviceCount() {
286         return devices.size();
287     }
288
289     @Override
290     public Iterable<Device> getDevices() {
291         return Collections.unmodifiableCollection(devices.values());
292     }
293
294     @Override
295     public Iterable<Device> getAvailableDevices() {
296         return FluentIterable.from(getDevices())
297                 .filter(input -> isAvailable(input.id()));
298     }
299
300     @Override
301     public Device getDevice(DeviceId deviceId) {
302         return devices.get(deviceId);
303     }
304
305     @Override
306     public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
307                                                          DeviceId deviceId,
308                                                          DeviceDescription deviceDescription) {
309         NodeId localNode = clusterService.getLocalNode().id();
310         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
311
312         // Process device update only if we're the master,
313         // otherwise signal the actual master.
314         DeviceEvent deviceEvent = null;
315         if (localNode.equals(deviceNode)) {
316
317             final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
318             final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
319             final Timestamped<DeviceDescription> mergedDesc;
320             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
321
322             synchronized (device) {
323                 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
324                 mergedDesc = device.get(providerId).getDeviceDesc();
325             }
326
327             if (deviceEvent != null) {
328                 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
329                          providerId, deviceId);
330                 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
331             }
332
333         } else {
334             // FIXME Temporary hack for NPE (ONOS-1171).
335             // Proper fix is to implement forwarding to master on ConfigProvider
336             // redo ONOS-490
337             if (deviceNode == null) {
338                 // silently ignore
339                 return null;
340             }
341
342
343             DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
344                     providerId, deviceId, deviceDescription);
345
346             // TODO check unicast return value
347             clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
348             /* error log:
349             log.warn("Failed to process injected device id: {} desc: {} " +
350                             "(cluster messaging failed: {})",
351                     deviceId, deviceDescription, e);
352             */
353         }
354
355         return deviceEvent;
356     }
357
358     private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
359                                                      DeviceId deviceId,
360                                                      Timestamped<DeviceDescription> deltaDesc) {
361
362         // Collection of DeviceDescriptions for a Device
363         Map<ProviderId, DeviceDescriptions> device
364                 = getOrCreateDeviceDescriptionsMap(deviceId);
365
366         synchronized (device) {
367             // locking per device
368
369             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
370                 log.debug("Ignoring outdated event: {}", deltaDesc);
371                 return null;
372             }
373
374             DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
375
376             final Device oldDevice = devices.get(deviceId);
377             final Device newDevice;
378
379             if (deltaDesc == descs.getDeviceDesc() ||
380                     deltaDesc.isNewer(descs.getDeviceDesc())) {
381                 // on new device or valid update
382                 descs.putDeviceDesc(deltaDesc);
383                 newDevice = composeDevice(deviceId, device);
384             } else {
385                 // outdated event, ignored.
386                 return null;
387             }
388             if (oldDevice == null) {
389                 // ADD
390                 return createDevice(providerId, newDevice, deltaDesc.timestamp());
391             } else {
392                 // UPDATE or ignore (no change or stale)
393                 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
394             }
395         }
396     }
397
398     // Creates the device and returns the appropriate event if necessary.
399     // Guarded by deviceDescs value (=Device lock)
400     private DeviceEvent createDevice(ProviderId providerId,
401                                      Device newDevice, Timestamp timestamp) {
402
403         // update composed device cache
404         Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
405         verify(oldDevice == null,
406                "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
407                providerId, oldDevice, newDevice);
408
409         if (!providerId.isAncillary()) {
410             markOnline(newDevice.id(), timestamp);
411         }
412
413         return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
414     }
415
416     // Updates the device and returns the appropriate event if necessary.
417     // Guarded by deviceDescs value (=Device lock)
418     private DeviceEvent updateDevice(ProviderId providerId,
419                                      Device oldDevice,
420                                      Device newDevice, Timestamp newTimestamp) {
421         // We allow only certain attributes to trigger update
422         boolean propertiesChanged =
423                 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
424                         !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
425                         !Objects.equals(oldDevice.providerId(), newDevice.providerId());
426         boolean annotationsChanged =
427                 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
428
429         // Primary providers can respond to all changes, but ancillary ones
430         // should respond only to annotation changes.
431         if ((providerId.isAncillary() && annotationsChanged) ||
432                 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
433             boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
434             if (!replaced) {
435                 verify(replaced,
436                        "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
437                        providerId, oldDevice, devices.get(newDevice.id())
438                         , newDevice);
439             }
440             if (!providerId.isAncillary()) {
441                 boolean wasOnline = availableDevices.contains(newDevice.id());
442                 markOnline(newDevice.id(), newTimestamp);
443                 if (!wasOnline) {
444                     notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
445                 }
446             }
447
448             return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
449         }
450         return null;
451     }
452
453     @Override
454     public DeviceEvent markOffline(DeviceId deviceId) {
455         final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
456         final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
457         if (event != null) {
458             log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
459                      deviceId, timestamp);
460             notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
461         }
462         return event;
463     }
464
465     private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
466
467         Map<ProviderId, DeviceDescriptions> providerDescs
468                 = getOrCreateDeviceDescriptionsMap(deviceId);
469
470         // locking device
471         synchronized (providerDescs) {
472
473             // accept off-line if given timestamp is newer than
474             // the latest Timestamp from Primary provider
475             DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
476             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
477             if (timestamp.compareTo(lastTimestamp) <= 0) {
478                 // outdated event ignore
479                 return null;
480             }
481
482             offline.put(deviceId, timestamp);
483
484             Device device = devices.get(deviceId);
485             if (device == null) {
486                 return null;
487             }
488             boolean removed = availableDevices.remove(deviceId);
489             if (removed) {
490                 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
491             }
492             return null;
493         }
494     }
495
496     /**
497      * Marks the device as available if the given timestamp is not outdated,
498      * compared to the time the device has been marked offline.
499      *
500      * @param deviceId  identifier of the device
501      * @param timestamp of the event triggering this change.
502      * @return true if availability change request was accepted and changed the state
503      */
504     // Guarded by deviceDescs value (=Device lock)
505     private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
506         // accept on-line if given timestamp is newer than
507         // the latest offline request Timestamp
508         Timestamp offlineTimestamp = offline.get(deviceId);
509         if (offlineTimestamp == null ||
510                 offlineTimestamp.compareTo(timestamp) < 0) {
511
512             offline.remove(deviceId);
513             return availableDevices.add(deviceId);
514         }
515         return false;
516     }
517
518     @Override
519     public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
520                                                       DeviceId deviceId,
521                                                       List<PortDescription> portDescriptions) {
522
523         NodeId localNode = clusterService.getLocalNode().id();
524         // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
525         // since it will trigger distributed store read.
526         // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
527         // outside Device subsystem. so that we don't have to modify both Device and Link stores.
528         // If we don't care much about topology performance, then it might be OK.
529         NodeId deviceNode = mastershipService.getMasterFor(deviceId);
530
531         // Process port update only if we're the master of the device,
532         // otherwise signal the actual master.
533         List<DeviceEvent> deviceEvents = null;
534         if (localNode.equals(deviceNode)) {
535
536             final Timestamp newTimestamp;
537             try {
538                 newTimestamp = deviceClockService.getTimestamp(deviceId);
539             } catch (IllegalStateException e) {
540                 log.info("Timestamp was not available for device {}", deviceId);
541                 log.debug("  discarding {}", portDescriptions);
542                 // Failed to generate timestamp.
543
544                 // Possible situation:
545                 //  Device connected and became master for short period of time,
546                 // but lost mastership before this instance had the chance to
547                 // retrieve term information.
548
549                 // Information dropped here is expected to be recoverable by
550                 // device probing after mastership change
551
552                 return Collections.emptyList();
553             }
554             log.debug("timestamp for {} {}", deviceId, newTimestamp);
555
556             final Timestamped<List<PortDescription>> timestampedInput
557                     = new Timestamped<>(portDescriptions, newTimestamp);
558             final Timestamped<List<PortDescription>> merged;
559
560             final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
561
562             synchronized (device) {
563                 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
564                 final DeviceDescriptions descs = device.get(providerId);
565                 List<PortDescription> mergedList =
566                         FluentIterable.from(portDescriptions)
567                                 .transform(new Function<PortDescription, PortDescription>() {
568                                     @Override
569                                     public PortDescription apply(PortDescription input) {
570                                         // lookup merged port description
571                                         return descs.getPortDesc(input.portNumber()).value();
572                                     }
573                                 }).toList();
574                 merged = new Timestamped<>(mergedList, newTimestamp);
575             }
576
577             if (!deviceEvents.isEmpty()) {
578                 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
579                          providerId, deviceId);
580                 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
581             }
582
583         } else {
584             // FIXME Temporary hack for NPE (ONOS-1171).
585             // Proper fix is to implement forwarding to master on ConfigProvider
586             // redo ONOS-490
587             if (deviceNode == null) {
588                 // silently ignore
589                 return Collections.emptyList();
590             }
591
592             PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
593
594             //TODO check unicast return value
595             clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
596             /* error log:
597             log.warn("Failed to process injected ports of device id: {} " +
598                             "(cluster messaging failed: {})",
599                     deviceId, e);
600             */
601         }
602
603         return deviceEvents == null ? Collections.emptyList() : deviceEvents;
604     }
605
606     private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
607                                                   DeviceId deviceId,
608                                                   Timestamped<List<PortDescription>> portDescriptions) {
609
610         Device device = devices.get(deviceId);
611         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
612
613         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
614         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
615
616         List<DeviceEvent> events = new ArrayList<>();
617         synchronized (descsMap) {
618
619             if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
620                 log.debug("Ignoring outdated events: {}", portDescriptions);
621                 return Collections.emptyList();
622             }
623
624             DeviceDescriptions descs = descsMap.get(providerId);
625             // every provider must provide DeviceDescription.
626             checkArgument(descs != null,
627                           "Device description for Device ID %s from Provider %s was not found",
628                           deviceId, providerId);
629
630             Map<PortNumber, Port> ports = getPortMap(deviceId);
631
632             final Timestamp newTimestamp = portDescriptions.timestamp();
633
634             // Add new ports
635             Set<PortNumber> processed = new HashSet<>();
636             for (PortDescription portDescription : portDescriptions.value()) {
637                 final PortNumber number = portDescription.portNumber();
638                 processed.add(number);
639
640                 final Port oldPort = ports.get(number);
641                 final Port newPort;
642
643
644                 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
645                 if (existingPortDesc == null ||
646                         newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
647                     // on new port or valid update
648                     // update description
649                     descs.putPortDesc(new Timestamped<>(portDescription,
650                                                         portDescriptions.timestamp()));
651                     newPort = composePort(device, number, descsMap);
652                 } else {
653                     // outdated event, ignored.
654                     continue;
655                 }
656
657                 events.add(oldPort == null ?
658                                    createPort(device, newPort, ports) :
659                                    updatePort(device, oldPort, newPort, ports));
660             }
661
662             events.addAll(pruneOldPorts(device, ports, processed));
663         }
664         return FluentIterable.from(events).filter(notNull()).toList();
665     }
666
667     // Creates a new port based on the port description adds it to the map and
668     // Returns corresponding event.
669     // Guarded by deviceDescs value (=Device lock)
670     private DeviceEvent createPort(Device device, Port newPort,
671                                    Map<PortNumber, Port> ports) {
672         ports.put(newPort.number(), newPort);
673         return new DeviceEvent(PORT_ADDED, device, newPort);
674     }
675
676     // Checks if the specified port requires update and if so, it replaces the
677     // existing entry in the map and returns corresponding event.
678     // Guarded by deviceDescs value (=Device lock)
679     private DeviceEvent updatePort(Device device, Port oldPort,
680                                    Port newPort,
681                                    Map<PortNumber, Port> ports) {
682         if (oldPort.isEnabled() != newPort.isEnabled() ||
683                 oldPort.type() != newPort.type() ||
684                 oldPort.portSpeed() != newPort.portSpeed() ||
685                 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
686             ports.put(oldPort.number(), newPort);
687             return new DeviceEvent(PORT_UPDATED, device, newPort);
688         }
689         return null;
690     }
691
692     // Prunes the specified list of ports based on which ports are in the
693     // processed list and returns list of corresponding events.
694     // Guarded by deviceDescs value (=Device lock)
695     private List<DeviceEvent> pruneOldPorts(Device device,
696                                             Map<PortNumber, Port> ports,
697                                             Set<PortNumber> processed) {
698         List<DeviceEvent> events = new ArrayList<>();
699         Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
700         while (iterator.hasNext()) {
701             Entry<PortNumber, Port> e = iterator.next();
702             PortNumber portNumber = e.getKey();
703             if (!processed.contains(portNumber)) {
704                 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
705                 iterator.remove();
706             }
707         }
708         return events;
709     }
710
711     // Gets the map of ports for the specified device; if one does not already
712     // exist, it creates and registers a new one.
713     private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
714         return createIfAbsentUnchecked(devicePorts, deviceId,
715                                        NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
716     }
717
718     private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
719             DeviceId deviceId) {
720         Map<ProviderId, DeviceDescriptions> r;
721         r = deviceDescs.get(deviceId);
722         if (r == null) {
723             r = new HashMap<>();
724             final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
725             concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
726             if (concurrentlyAdded != null) {
727                 r = concurrentlyAdded;
728             }
729         }
730         return r;
731     }
732
733     // Guarded by deviceDescs value (=Device lock)
734     private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
735             Map<ProviderId, DeviceDescriptions> device,
736             ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
737         synchronized (device) {
738             DeviceDescriptions r = device.get(providerId);
739             if (r == null) {
740                 r = new DeviceDescriptions(deltaDesc);
741                 device.put(providerId, r);
742             }
743             return r;
744         }
745     }
746
747     @Override
748     public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
749                                                      DeviceId deviceId,
750                                                      PortDescription portDescription) {
751         final Timestamp newTimestamp;
752         try {
753             newTimestamp = deviceClockService.getTimestamp(deviceId);
754         } catch (IllegalStateException e) {
755             log.info("Timestamp was not available for device {}", deviceId);
756             log.debug("  discarding {}", portDescription);
757             // Failed to generate timestamp. Ignoring.
758             // See updatePorts comment
759             return null;
760         }
761         final Timestamped<PortDescription> deltaDesc
762                 = new Timestamped<>(portDescription, newTimestamp);
763         final DeviceEvent event;
764         final Timestamped<PortDescription> mergedDesc;
765         final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
766         synchronized (device) {
767             event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
768             mergedDesc = device.get(providerId)
769                     .getPortDesc(portDescription.portNumber());
770         }
771         if (event != null) {
772             log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
773                      providerId, deviceId);
774             notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
775         }
776         return event;
777     }
778
779     private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
780                                                  Timestamped<PortDescription> deltaDesc) {
781         Device device = devices.get(deviceId);
782         checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
783
784         Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
785         checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
786
787         synchronized (descsMap) {
788
789             if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
790                 log.debug("Ignoring outdated event: {}", deltaDesc);
791                 return null;
792             }
793
794             DeviceDescriptions descs = descsMap.get(providerId);
795             // assuming all providers must to give DeviceDescription
796             verify(descs != null,
797                    "Device description for Device ID %s from Provider %s was not found",
798                    deviceId, providerId);
799
800             ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
801             final PortNumber number = deltaDesc.value().portNumber();
802             final Port oldPort = ports.get(number);
803             final Port newPort;
804
805             final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
806             if (existingPortDesc == null ||
807                     deltaDesc.isNewer(existingPortDesc)) {
808                 // on new port or valid update
809                 // update description
810                 descs.putPortDesc(deltaDesc);
811                 newPort = composePort(device, number, descsMap);
812             } else {
813                 // same or outdated event, ignored.
814                 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
815                 return null;
816             }
817
818             if (oldPort == null) {
819                 return createPort(device, newPort, ports);
820             } else {
821                 return updatePort(device, oldPort, newPort, ports);
822             }
823         }
824     }
825
826     @Override
827     public List<Port> getPorts(DeviceId deviceId) {
828         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
829         if (ports == null) {
830             return Collections.emptyList();
831         }
832         return ImmutableList.copyOf(ports.values());
833     }
834
835     @Override
836     public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
837                                             Collection<PortStatistics> newStatsCollection) {
838
839         Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
840         Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
841         Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
842
843         if (prvStatsMap != null) {
844             for (PortStatistics newStats : newStatsCollection) {
845                 PortNumber port = PortNumber.portNumber(newStats.port());
846                 PortStatistics prvStats = prvStatsMap.get(port);
847                 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
848                 PortStatistics deltaStats = builder.build();
849                 if (prvStats != null) {
850                     deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
851                 }
852                 deltaStatsMap.put(port, deltaStats);
853                 newStatsMap.put(port, newStats);
854             }
855         } else {
856             for (PortStatistics newStats : newStatsCollection) {
857                 PortNumber port = PortNumber.portNumber(newStats.port());
858                 newStatsMap.put(port, newStats);
859             }
860         }
861         devicePortDeltaStats.put(deviceId, deltaStatsMap);
862         devicePortStats.put(deviceId, newStatsMap);
863         // DeviceEvent returns null because of InternalPortStatsListener usage
864         return null;
865     }
866
867     /**
868      * Calculate delta statistics by subtracting previous from new statistics.
869      *
870      * @param deviceId device identifier
871      * @param prvStats previous port statistics
872      * @param newStats new port statistics
873      * @return PortStatistics
874      */
875     public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
876         // calculate time difference
877         long deltaStatsSec, deltaStatsNano;
878         if (newStats.durationNano() < prvStats.durationNano()) {
879             deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
880             deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
881         } else {
882             deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
883             deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
884         }
885         DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
886         DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
887                 .setPort(newStats.port())
888                 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
889                 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
890                 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
891                 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
892                 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
893                 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
894                 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
895                 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
896                 .setDurationSec(deltaStatsSec)
897                 .setDurationNano(deltaStatsNano)
898                 .build();
899         return deltaStats;
900     }
901
902     @Override
903     public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
904         Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
905         if (portStats == null) {
906             return Collections.emptyList();
907         }
908         return ImmutableList.copyOf(portStats.values());
909     }
910
911     @Override
912     public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
913         Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
914         if (portStats == null) {
915             return Collections.emptyList();
916         }
917         return ImmutableList.copyOf(portStats.values());
918     }
919
920     @Override
921     public Port getPort(DeviceId deviceId, PortNumber portNumber) {
922         Map<PortNumber, Port> ports = devicePorts.get(deviceId);
923         return ports == null ? null : ports.get(portNumber);
924     }
925
926     @Override
927     public boolean isAvailable(DeviceId deviceId) {
928         return availableDevices.contains(deviceId);
929     }
930
931     @Override
932     public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
933         final NodeId myId = clusterService.getLocalNode().id();
934         NodeId master = mastershipService.getMasterFor(deviceId);
935
936         // if there exist a master, forward
937         // if there is no master, try to become one and process
938
939         boolean relinquishAtEnd = false;
940         if (master == null) {
941             final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
942             if (myRole != MastershipRole.NONE) {
943                 relinquishAtEnd = true;
944             }
945             log.debug("Temporarily requesting role for {} to remove", deviceId);
946             mastershipService.requestRoleFor(deviceId);
947             MastershipTerm term = termService.getMastershipTerm(deviceId);
948             if (term != null && myId.equals(term.master())) {
949                 master = myId;
950             }
951         }
952
953         if (!myId.equals(master)) {
954             log.debug("{} has control of {}, forwarding remove request",
955                       master, deviceId);
956
957             // TODO check unicast return value
958             clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
959              /* error log:
960              log.error("Failed to forward {} remove request to {}", deviceId, master, e);
961              */
962
963             // event will be triggered after master processes it.
964             return null;
965         }
966
967         // I have control..
968
969         Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
970         DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
971         if (event != null) {
972             log.debug("Notifying peers of a device removed topology event for deviceId: {}",
973                       deviceId);
974             notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
975         }
976         if (relinquishAtEnd) {
977             log.debug("Relinquishing temporary role acquired for {}", deviceId);
978             mastershipService.relinquishMastership(deviceId);
979         }
980         return event;
981     }
982
983     private DeviceEvent removeDeviceInternal(DeviceId deviceId,
984                                              Timestamp timestamp) {
985
986         Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
987         synchronized (descs) {
988             // accept removal request if given timestamp is newer than
989             // the latest Timestamp from Primary provider
990             DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
991             Timestamp lastTimestamp = primDescs.getLatestTimestamp();
992             if (timestamp.compareTo(lastTimestamp) <= 0) {
993                 // outdated event ignore
994                 return null;
995             }
996             removalRequest.put(deviceId, timestamp);
997
998             Device device = devices.remove(deviceId);
999             // should DEVICE_REMOVED carry removed ports?
1000             Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1001             if (ports != null) {
1002                 ports.clear();
1003             }
1004             markOfflineInternal(deviceId, timestamp);
1005             descs.clear();
1006             return device == null ? null :
1007                     new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1008         }
1009     }
1010
1011     /**
1012      * Checks if given timestamp is superseded by removal request
1013      * with more recent timestamp.
1014      *
1015      * @param deviceId         identifier of a device
1016      * @param timestampToCheck timestamp of an event to check
1017      * @return true if device is already removed
1018      */
1019     private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1020         Timestamp removalTimestamp = removalRequest.get(deviceId);
1021         if (removalTimestamp != null &&
1022                 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1023             // removalRequest is more recent
1024             return true;
1025         }
1026         return false;
1027     }
1028
1029     /**
1030      * Returns a Device, merging description given from multiple Providers.
1031      *
1032      * @param deviceId      device identifier
1033      * @param providerDescs Collection of Descriptions from multiple providers
1034      * @return Device instance
1035      */
1036     private Device composeDevice(DeviceId deviceId,
1037                                  Map<ProviderId, DeviceDescriptions> providerDescs) {
1038
1039         checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1040
1041         ProviderId primary = pickPrimaryPID(providerDescs);
1042
1043         DeviceDescriptions desc = providerDescs.get(primary);
1044
1045         final DeviceDescription base = desc.getDeviceDesc().value();
1046         Type type = base.type();
1047         String manufacturer = base.manufacturer();
1048         String hwVersion = base.hwVersion();
1049         String swVersion = base.swVersion();
1050         String serialNumber = base.serialNumber();
1051         ChassisId chassisId = base.chassisId();
1052         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1053         annotations = merge(annotations, base.annotations());
1054
1055         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1056             if (e.getKey().equals(primary)) {
1057                 continue;
1058             }
1059             // Note: should keep track of Description timestamp in the future
1060             // and only merge conflicting keys when timestamp is newer.
1061             // Currently assuming there will never be a key conflict between
1062             // providers
1063
1064             // annotation merging. not so efficient, should revisit later
1065             annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1066         }
1067
1068         return new DefaultDevice(primary, deviceId, type, manufacturer,
1069                                  hwVersion, swVersion, serialNumber,
1070                                  chassisId, annotations);
1071     }
1072
1073     private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1074                                  PortDescription description, Annotations annotations) {
1075         switch (description.type()) {
1076             case OMS:
1077                 OmsPortDescription omsDesc = (OmsPortDescription) description;
1078                 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1079                         omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1080             case OCH:
1081                 OchPortDescription ochDesc = (OchPortDescription) description;
1082                 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1083                         ochDesc.isTunable(), ochDesc.lambda(), annotations);
1084             case ODUCLT:
1085                 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1086                 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1087             default:
1088                 return new DefaultPort(device, number, isEnabled, description.type(),
1089                         description.portSpeed(), annotations);
1090         }
1091     }
1092
1093     /**
1094      * Returns a Port, merging description given from multiple Providers.
1095      *
1096      * @param device   device the port is on
1097      * @param number   port number
1098      * @param descsMap Collection of Descriptions from multiple providers
1099      * @return Port instance
1100      */
1101     private Port composePort(Device device, PortNumber number,
1102                              Map<ProviderId, DeviceDescriptions> descsMap) {
1103
1104         ProviderId primary = pickPrimaryPID(descsMap);
1105         DeviceDescriptions primDescs = descsMap.get(primary);
1106         // if no primary, assume not enabled
1107         boolean isEnabled = false;
1108         DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1109         Timestamp newest = null;
1110         final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1111         if (portDesc != null) {
1112             isEnabled = portDesc.value().isEnabled();
1113             annotations = merge(annotations, portDesc.value().annotations());
1114             newest = portDesc.timestamp();
1115         }
1116         Port updated = null;
1117         for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1118             if (e.getKey().equals(primary)) {
1119                 continue;
1120             }
1121             // Note: should keep track of Description timestamp in the future
1122             // and only merge conflicting keys when timestamp is newer.
1123             // Currently assuming there will never be a key conflict between
1124             // providers
1125
1126             // annotation merging. not so efficient, should revisit later
1127             final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1128             if (otherPortDesc != null) {
1129                 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1130                     continue;
1131                 }
1132                 annotations = merge(annotations, otherPortDesc.value().annotations());
1133                 PortDescription other = otherPortDesc.value();
1134                 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1135                 newest = otherPortDesc.timestamp();
1136             }
1137         }
1138         if (portDesc == null) {
1139             return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1140         }
1141         PortDescription current = portDesc.value();
1142         return updated == null
1143                 ? buildTypedPort(device, number, isEnabled, current, annotations)
1144                 : updated;
1145     }
1146
1147     /**
1148      * @return primary ProviderID, or randomly chosen one if none exists
1149      */
1150     private ProviderId pickPrimaryPID(
1151             Map<ProviderId, DeviceDescriptions> providerDescs) {
1152         ProviderId fallBackPrimary = null;
1153         for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1154             if (!e.getKey().isAncillary()) {
1155                 return e.getKey();
1156             } else if (fallBackPrimary == null) {
1157                 // pick randomly as a fallback in case there is no primary
1158                 fallBackPrimary = e.getKey();
1159             }
1160         }
1161         return fallBackPrimary;
1162     }
1163
1164     private DeviceDescriptions getPrimaryDescriptions(
1165             Map<ProviderId, DeviceDescriptions> providerDescs) {
1166         ProviderId pid = pickPrimaryPID(providerDescs);
1167         return providerDescs.get(pid);
1168     }
1169
1170     private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1171         clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1172     }
1173
1174     private void broadcastMessage(MessageSubject subject, Object event) {
1175         clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1176     }
1177
1178     private void notifyPeers(InternalDeviceEvent event) {
1179         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1180     }
1181
1182     private void notifyPeers(InternalDeviceOfflineEvent event) {
1183         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1184     }
1185
1186     private void notifyPeers(InternalDeviceRemovedEvent event) {
1187         broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1188     }
1189
1190     private void notifyPeers(InternalPortEvent event) {
1191         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1192     }
1193
1194     private void notifyPeers(InternalPortStatusEvent event) {
1195         broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1196     }
1197
1198     private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1199         try {
1200             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1201         } catch (IOException e) {
1202             log.error("Failed to send" + event + " to " + recipient, e);
1203         }
1204     }
1205
1206     private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1207         try {
1208             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1209         } catch (IOException e) {
1210             log.error("Failed to send" + event + " to " + recipient, e);
1211         }
1212     }
1213
1214     private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1215         try {
1216             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1217         } catch (IOException e) {
1218             log.error("Failed to send" + event + " to " + recipient, e);
1219         }
1220     }
1221
1222     private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1223         try {
1224             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1225         } catch (IOException e) {
1226             log.error("Failed to send" + event + " to " + recipient, e);
1227         }
1228     }
1229
1230     private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1231         try {
1232             unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1233         } catch (IOException e) {
1234             log.error("Failed to send" + event + " to " + recipient, e);
1235         }
1236     }
1237
1238     private DeviceAntiEntropyAdvertisement createAdvertisement() {
1239         final NodeId self = clusterService.getLocalNode().id();
1240
1241         final int numDevices = deviceDescs.size();
1242         Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1243         final int portsPerDevice = 8; // random factor to minimize reallocation
1244         Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1245         Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1246
1247         deviceDescs.forEach((deviceId, devDescs) -> {
1248
1249             // for each Device...
1250             synchronized (devDescs) {
1251
1252                 // send device offline timestamp
1253                 Timestamp lOffline = this.offline.get(deviceId);
1254                 if (lOffline != null) {
1255                     adOffline.put(deviceId, lOffline);
1256                 }
1257
1258                 for (Entry<ProviderId, DeviceDescriptions>
1259                         prov : devDescs.entrySet()) {
1260
1261                     // for each Provider Descriptions...
1262                     final ProviderId provId = prov.getKey();
1263                     final DeviceDescriptions descs = prov.getValue();
1264
1265                     adDevices.put(new DeviceFragmentId(deviceId, provId),
1266                                   descs.getDeviceDesc().timestamp());
1267
1268                     for (Entry<PortNumber, Timestamped<PortDescription>>
1269                             portDesc : descs.getPortDescs().entrySet()) {
1270
1271                         final PortNumber number = portDesc.getKey();
1272                         adPorts.put(new PortFragmentId(deviceId, provId, number),
1273                                     portDesc.getValue().timestamp());
1274                     }
1275                 }
1276             }
1277         });
1278
1279         return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1280     }
1281
1282     /**
1283      * Responds to anti-entropy advertisement message.
1284      * <p/>
1285      * Notify sender about out-dated information using regular replication message.
1286      * Send back advertisement to sender if not in sync.
1287      *
1288      * @param advertisement to respond to
1289      */
1290     private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1291
1292         final NodeId sender = advertisement.sender();
1293
1294         Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1295         Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1296         Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1297
1298         // Fragments to request
1299         Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1300         Collection<PortFragmentId> reqPorts = new ArrayList<>();
1301
1302         for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1303             final DeviceId deviceId = de.getKey();
1304             final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1305
1306             synchronized (lDevice) {
1307                 // latestTimestamp across provider
1308                 // Note: can be null initially
1309                 Timestamp localLatest = offline.get(deviceId);
1310
1311                 // handle device Ads
1312                 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1313                     final ProviderId provId = prov.getKey();
1314                     final DeviceDescriptions lDeviceDescs = prov.getValue();
1315
1316                     final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1317
1318
1319                     Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1320                     Timestamp advDevTimestamp = devAds.get(devFragId);
1321
1322                     if (advDevTimestamp == null || lProvDevice.isNewerThan(
1323                             advDevTimestamp)) {
1324                         // remote does not have it or outdated, suggest
1325                         notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1326                     } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1327                         // local is outdated, request
1328                         reqDevices.add(devFragId);
1329                     }
1330
1331                     // handle port Ads
1332                     for (Entry<PortNumber, Timestamped<PortDescription>>
1333                             pe : lDeviceDescs.getPortDescs().entrySet()) {
1334
1335                         final PortNumber num = pe.getKey();
1336                         final Timestamped<PortDescription> lPort = pe.getValue();
1337
1338                         final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1339
1340                         Timestamp advPortTimestamp = portAds.get(portFragId);
1341                         if (advPortTimestamp == null || lPort.isNewerThan(
1342                                 advPortTimestamp)) {
1343                             // remote does not have it or outdated, suggest
1344                             notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1345                         } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1346                             // local is outdated, request
1347                             log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1348                             reqPorts.add(portFragId);
1349                         }
1350
1351                         // remove port Ad already processed
1352                         portAds.remove(portFragId);
1353                     } // end local port loop
1354
1355                     // remove device Ad already processed
1356                     devAds.remove(devFragId);
1357
1358                     // find latest and update
1359                     final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1360                     if (localLatest == null ||
1361                             providerLatest.compareTo(localLatest) > 0) {
1362                         localLatest = providerLatest;
1363                     }
1364                 } // end local provider loop
1365
1366                 // checking if remote timestamp is more recent.
1367                 Timestamp rOffline = offlineAds.get(deviceId);
1368                 if (rOffline != null &&
1369                         rOffline.compareTo(localLatest) > 0) {
1370                     // remote offline timestamp suggests that the
1371                     // device is off-line
1372                     markOfflineInternal(deviceId, rOffline);
1373                 }
1374
1375                 Timestamp lOffline = offline.get(deviceId);
1376                 if (lOffline != null && rOffline == null) {
1377                     // locally offline, but remote is online, suggest offline
1378                     notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1379                 }
1380
1381                 // remove device offline Ad already processed
1382                 offlineAds.remove(deviceId);
1383             } // end local device loop
1384         } // device lock
1385
1386         // If there is any Ads left, request them
1387         log.trace("Ads left {}, {}", devAds, portAds);
1388         reqDevices.addAll(devAds.keySet());
1389         reqPorts.addAll(portAds.keySet());
1390
1391         if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1392             log.trace("Nothing to request to remote peer {}", sender);
1393             return;
1394         }
1395
1396         log.debug("Need to sync {} {}", reqDevices, reqPorts);
1397
1398         // 2-way Anti-Entropy for now
1399         try {
1400             unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1401         } catch (IOException e) {
1402             log.error("Failed to send response advertisement to " + sender, e);
1403         }
1404
1405 // Sketch of 3-way Anti-Entropy
1406 //        DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1407 //        ClusterMessage message = new ClusterMessage(
1408 //                clusterService.getLocalNode().id(),
1409 //                GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1410 //                SERIALIZER.encode(request));
1411 //
1412 //        try {
1413 //            clusterCommunicator.unicast(message, advertisement.sender());
1414 //        } catch (IOException e) {
1415 //            log.error("Failed to send advertisement reply to "
1416 //                      + advertisement.sender(), e);
1417 //        }
1418     }
1419
1420     private void notifyDelegateIfNotNull(DeviceEvent event) {
1421         if (event != null) {
1422             notifyDelegate(event);
1423         }
1424     }
1425
1426     private final class SendAdvertisementTask implements Runnable {
1427
1428         @Override
1429         public void run() {
1430             if (Thread.currentThread().isInterrupted()) {
1431                 log.debug("Interrupted, quitting");
1432                 return;
1433             }
1434
1435             try {
1436                 final NodeId self = clusterService.getLocalNode().id();
1437                 Set<ControllerNode> nodes = clusterService.getNodes();
1438
1439                 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1440                         .transform(toNodeId())
1441                         .toList();
1442
1443                 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1444                     log.trace("No other peers in the cluster.");
1445                     return;
1446                 }
1447
1448                 NodeId peer;
1449                 do {
1450                     int idx = RandomUtils.nextInt(0, nodeIds.size());
1451                     peer = nodeIds.get(idx);
1452                 } while (peer.equals(self));
1453
1454                 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1455
1456                 if (Thread.currentThread().isInterrupted()) {
1457                     log.debug("Interrupted, quitting");
1458                     return;
1459                 }
1460
1461                 try {
1462                     unicastMessage(peer, DEVICE_ADVERTISE, ad);
1463                 } catch (IOException e) {
1464                     log.debug("Failed to send anti-entropy advertisement to {}", peer);
1465                     return;
1466                 }
1467             } catch (Exception e) {
1468                 // catch all Exception to avoid Scheduled task being suppressed.
1469                 log.error("Exception thrown while sending advertisement", e);
1470             }
1471         }
1472     }
1473
1474     private final class InternalDeviceEventListener
1475             implements ClusterMessageHandler {
1476         @Override
1477         public void handle(ClusterMessage message) {
1478             log.debug("Received device update event from peer: {}", message.sender());
1479             InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1480
1481             ProviderId providerId = event.providerId();
1482             DeviceId deviceId = event.deviceId();
1483             Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1484
1485             try {
1486                 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1487             } catch (Exception e) {
1488                 log.warn("Exception thrown handling device update", e);
1489             }
1490         }
1491     }
1492
1493     private final class InternalDeviceOfflineEventListener
1494             implements ClusterMessageHandler {
1495         @Override
1496         public void handle(ClusterMessage message) {
1497             log.debug("Received device offline event from peer: {}", message.sender());
1498             InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1499
1500             DeviceId deviceId = event.deviceId();
1501             Timestamp timestamp = event.timestamp();
1502
1503             try {
1504                 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1505             } catch (Exception e) {
1506                 log.warn("Exception thrown handling device offline", e);
1507             }
1508         }
1509     }
1510
1511     private final class InternalRemoveRequestListener
1512             implements ClusterMessageHandler {
1513         @Override
1514         public void handle(ClusterMessage message) {
1515             log.debug("Received device remove request from peer: {}", message.sender());
1516             DeviceId did = SERIALIZER.decode(message.payload());
1517
1518             try {
1519                 removeDevice(did);
1520             } catch (Exception e) {
1521                 log.warn("Exception thrown handling device remove", e);
1522             }
1523         }
1524     }
1525
1526     private final class InternalDeviceRemovedEventListener
1527             implements ClusterMessageHandler {
1528         @Override
1529         public void handle(ClusterMessage message) {
1530             log.debug("Received device removed event from peer: {}", message.sender());
1531             InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1532
1533             DeviceId deviceId = event.deviceId();
1534             Timestamp timestamp = event.timestamp();
1535
1536             try {
1537                 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1538             } catch (Exception e) {
1539                 log.warn("Exception thrown handling device removed", e);
1540             }
1541         }
1542     }
1543
1544     private final class InternalPortEventListener
1545             implements ClusterMessageHandler {
1546         @Override
1547         public void handle(ClusterMessage message) {
1548
1549             log.debug("Received port update event from peer: {}", message.sender());
1550             InternalPortEvent event = SERIALIZER.decode(message.payload());
1551
1552             ProviderId providerId = event.providerId();
1553             DeviceId deviceId = event.deviceId();
1554             Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1555
1556             if (getDevice(deviceId) == null) {
1557                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1558                 // Note: dropped information will be recovered by anti-entropy
1559                 return;
1560             }
1561
1562             try {
1563                 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1564             } catch (Exception e) {
1565                 log.warn("Exception thrown handling port update", e);
1566             }
1567         }
1568     }
1569
1570     private final class InternalPortStatusEventListener
1571             implements ClusterMessageHandler {
1572         @Override
1573         public void handle(ClusterMessage message) {
1574
1575             log.debug("Received port status update event from peer: {}", message.sender());
1576             InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1577
1578             ProviderId providerId = event.providerId();
1579             DeviceId deviceId = event.deviceId();
1580             Timestamped<PortDescription> portDescription = event.portDescription();
1581
1582             if (getDevice(deviceId) == null) {
1583                 log.debug("{} not found on this node yet, ignoring.", deviceId);
1584                 // Note: dropped information will be recovered by anti-entropy
1585                 return;
1586             }
1587
1588             try {
1589                 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1590             } catch (Exception e) {
1591                 log.warn("Exception thrown handling port update", e);
1592             }
1593         }
1594     }
1595
1596     private final class InternalDeviceAdvertisementListener
1597             implements ClusterMessageHandler {
1598         @Override
1599         public void handle(ClusterMessage message) {
1600             log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1601             DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1602             try {
1603                 handleAdvertisement(advertisement);
1604             } catch (Exception e) {
1605                 log.warn("Exception thrown handling Device advertisements.", e);
1606             }
1607         }
1608     }
1609
1610     private final class DeviceInjectedEventListener
1611             implements ClusterMessageHandler {
1612         @Override
1613         public void handle(ClusterMessage message) {
1614             log.debug("Received injected device event from peer: {}", message.sender());
1615             DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1616
1617             ProviderId providerId = event.providerId();
1618             DeviceId deviceId = event.deviceId();
1619             DeviceDescription deviceDescription = event.deviceDescription();
1620             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1621                 // workaround for ONOS-1208
1622                 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1623                 return;
1624             }
1625
1626             try {
1627                 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1628             } catch (Exception e) {
1629                 log.warn("Exception thrown handling device injected event.", e);
1630             }
1631         }
1632     }
1633
1634     private final class PortInjectedEventListener
1635             implements ClusterMessageHandler {
1636         @Override
1637         public void handle(ClusterMessage message) {
1638             log.debug("Received injected port event from peer: {}", message.sender());
1639             PortInjectedEvent event = SERIALIZER.decode(message.payload());
1640
1641             ProviderId providerId = event.providerId();
1642             DeviceId deviceId = event.deviceId();
1643             List<PortDescription> portDescriptions = event.portDescriptions();
1644             if (!deviceClockService.isTimestampAvailable(deviceId)) {
1645                 // workaround for ONOS-1208
1646                 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1647                 return;
1648             }
1649
1650             try {
1651                 updatePorts(providerId, deviceId, portDescriptions);
1652             } catch (Exception e) {
1653                 log.warn("Exception thrown handling port injected event.", e);
1654             }
1655         }
1656     }
1657
1658     private class InternalPortStatsListener
1659             implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1660         @Override
1661         public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1662             if (event.type() == PUT) {
1663                 Device device = devices.get(event.key());
1664                 if (device != null) {
1665                     delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));
1666                 }
1667             }
1668         }
1669     }
1670 }