2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.device.impl;
18 import com.google.common.base.Function;
19 import com.google.common.collect.FluentIterable;
20 import com.google.common.collect.ImmutableList;
21 import com.google.common.collect.Maps;
22 import com.google.common.collect.Sets;
24 import org.apache.commons.lang3.RandomUtils;
25 import org.apache.felix.scr.annotations.Activate;
26 import org.apache.felix.scr.annotations.Component;
27 import org.apache.felix.scr.annotations.Deactivate;
28 import org.apache.felix.scr.annotations.Reference;
29 import org.apache.felix.scr.annotations.ReferenceCardinality;
30 import org.apache.felix.scr.annotations.Service;
31 import org.onlab.packet.ChassisId;
32 import org.onlab.util.KryoNamespace;
33 import org.onlab.util.NewConcurrentHashMap;
34 import org.onosproject.cluster.ClusterService;
35 import org.onosproject.cluster.ControllerNode;
36 import org.onosproject.cluster.NodeId;
37 import org.onosproject.mastership.MastershipService;
38 import org.onosproject.mastership.MastershipTerm;
39 import org.onosproject.mastership.MastershipTermService;
40 import org.onosproject.net.Annotations;
41 import org.onosproject.net.AnnotationsUtil;
42 import org.onosproject.net.DefaultAnnotations;
43 import org.onosproject.net.DefaultDevice;
44 import org.onosproject.net.DefaultPort;
45 import org.onosproject.net.Device;
46 import org.onosproject.net.Device.Type;
47 import org.onosproject.net.DeviceId;
48 import org.onosproject.net.MastershipRole;
49 import org.onosproject.net.OchPort;
50 import org.onosproject.net.OduCltPort;
51 import org.onosproject.net.OmsPort;
52 import org.onosproject.net.Port;
53 import org.onosproject.net.PortNumber;
54 import org.onosproject.net.device.DefaultPortStatistics;
55 import org.onosproject.net.device.DeviceClockService;
56 import org.onosproject.net.device.DeviceDescription;
57 import org.onosproject.net.device.DeviceEvent;
58 import org.onosproject.net.device.DeviceStore;
59 import org.onosproject.net.device.DeviceStoreDelegate;
60 import org.onosproject.net.device.OchPortDescription;
61 import org.onosproject.net.device.OduCltPortDescription;
62 import org.onosproject.net.device.OmsPortDescription;
63 import org.onosproject.net.device.PortDescription;
64 import org.onosproject.net.device.PortStatistics;
65 import org.onosproject.net.provider.ProviderId;
66 import org.onosproject.store.AbstractStore;
67 import org.onosproject.store.Timestamp;
68 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
69 import org.onosproject.store.cluster.messaging.ClusterMessage;
70 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
71 import org.onosproject.store.cluster.messaging.MessageSubject;
72 import org.onosproject.store.impl.Timestamped;
73 import org.onosproject.store.serializers.KryoNamespaces;
74 import org.onosproject.store.serializers.KryoSerializer;
75 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
76 import org.onosproject.store.service.EventuallyConsistentMap;
77 import org.onosproject.store.service.EventuallyConsistentMapEvent;
78 import org.onosproject.store.service.EventuallyConsistentMapListener;
79 import org.onosproject.store.service.MultiValuedTimestamp;
80 import org.onosproject.store.service.StorageService;
81 import org.onosproject.store.service.WallClockTimestamp;
82 import org.slf4j.Logger;
84 import java.io.IOException;
85 import java.util.ArrayList;
86 import java.util.Collection;
87 import java.util.Collections;
88 import java.util.HashMap;
89 import java.util.HashSet;
90 import java.util.Iterator;
91 import java.util.List;
93 import java.util.Map.Entry;
94 import java.util.Objects;
96 import java.util.concurrent.ConcurrentMap;
97 import java.util.concurrent.ExecutorService;
98 import java.util.concurrent.Executors;
99 import java.util.concurrent.ScheduledExecutorService;
100 import java.util.concurrent.TimeUnit;
102 import static com.google.common.base.Preconditions.checkArgument;
103 import static com.google.common.base.Predicates.notNull;
104 import static com.google.common.base.Verify.verify;
105 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
106 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
107 import static org.onlab.util.Tools.groupedThreads;
108 import static org.onlab.util.Tools.minPriority;
109 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
110 import static org.onosproject.net.DefaultAnnotations.merge;
111 import static org.onosproject.net.device.DeviceEvent.Type.*;
112 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
113 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
114 import static org.slf4j.LoggerFactory.getLogger;
117 * Manages inventory of infrastructure devices using gossip protocol to distribute
120 @Component(immediate = true)
122 public class GossipDeviceStore
123 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
124 implements DeviceStore {
126 private final Logger log = getLogger(getClass());
128 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
129 // Timeout in milliseconds to process device or ports on remote master node
130 private static final int REMOTE_MASTER_TIMEOUT = 1000;
132 // innerMap is used to lock a Device, thus instance should never be replaced.
133 // collection of Description given from various providers
134 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
135 deviceDescs = Maps.newConcurrentMap();
137 // cache of Device and Ports generated by compositing descriptions from providers
138 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
139 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
141 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
142 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
143 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
144 portStatsListener = new InternalPortStatsListener();
146 // to be updated under Device lock
147 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
148 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
150 // available(=UP) devices
151 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
153 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
154 protected DeviceClockService deviceClockService;
156 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
157 protected StorageService storageService;
159 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
160 protected ClusterCommunicationService clusterCommunicator;
162 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
163 protected ClusterService clusterService;
165 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
166 protected MastershipService mastershipService;
168 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
169 protected MastershipTermService termService;
172 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
174 protected void setupKryoPool() {
175 serializerPool = KryoNamespace.newBuilder()
176 .register(DistributedStoreSerializers.STORE_COMMON)
177 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
178 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
179 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
180 .register(InternalDeviceRemovedEvent.class)
181 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
182 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
183 .register(DeviceAntiEntropyAdvertisement.class)
184 .register(DeviceFragmentId.class)
185 .register(PortFragmentId.class)
186 .register(DeviceInjectedEvent.class)
187 .register(PortInjectedEvent.class)
192 private ExecutorService executor;
194 private ScheduledExecutorService backgroundExecutor;
196 // TODO make these anti-entropy parameters configurable
197 private long initialDelaySec = 5;
198 private long periodSec = 5;
201 public void activate() {
202 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
205 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
207 clusterCommunicator.addSubscriber(
208 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
209 clusterCommunicator.addSubscriber(
210 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
211 new InternalDeviceOfflineEventListener(),
213 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
214 new InternalRemoveRequestListener(),
216 clusterCommunicator.addSubscriber(
217 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
218 clusterCommunicator.addSubscriber(
219 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
220 clusterCommunicator.addSubscriber(
221 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
222 clusterCommunicator.addSubscriber(
223 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
224 new InternalDeviceAdvertisementListener(),
226 clusterCommunicator.addSubscriber(
227 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
228 clusterCommunicator.addSubscriber(
229 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
231 // start anti-entropy thread
232 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
233 initialDelaySec, periodSec, TimeUnit.SECONDS);
235 // Create a distributed map for port stats.
236 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
237 .register(KryoNamespaces.API)
238 .register(DefaultPortStatistics.class)
239 .register(DeviceId.class)
240 .register(MultiValuedTimestamp.class)
241 .register(WallClockTimestamp.class);
243 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
244 .withName("port-stats")
245 .withSerializer(deviceDataSerializer)
246 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
247 .withTimestampProvider((k, v) -> new WallClockTimestamp())
248 .withTombstonesDisabled()
250 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
251 eventuallyConsistentMapBuilder()
252 .withName("port-stats-delta")
253 .withSerializer(deviceDataSerializer)
254 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
255 .withTimestampProvider((k, v) -> new WallClockTimestamp())
256 .withTombstonesDisabled()
258 devicePortStats.addListener(portStatsListener);
263 public void deactivate() {
264 devicePortStats.destroy();
265 devicePortDeltaStats.destroy();
266 executor.shutdownNow();
268 backgroundExecutor.shutdownNow();
270 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
271 log.error("Timeout during executor shutdown");
273 } catch (InterruptedException e) {
274 log.error("Error during executor shutdown", e);
280 availableDevices.clear();
285 public int getDeviceCount() {
286 return devices.size();
290 public Iterable<Device> getDevices() {
291 return Collections.unmodifiableCollection(devices.values());
295 public Iterable<Device> getAvailableDevices() {
296 return FluentIterable.from(getDevices())
297 .filter(input -> isAvailable(input.id()));
301 public Device getDevice(DeviceId deviceId) {
302 return devices.get(deviceId);
306 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
308 DeviceDescription deviceDescription) {
309 NodeId localNode = clusterService.getLocalNode().id();
310 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
312 // Process device update only if we're the master,
313 // otherwise signal the actual master.
314 DeviceEvent deviceEvent = null;
315 if (localNode.equals(deviceNode)) {
317 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
318 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
319 final Timestamped<DeviceDescription> mergedDesc;
320 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
322 synchronized (device) {
323 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
324 mergedDesc = device.get(providerId).getDeviceDesc();
327 if (deviceEvent != null) {
328 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
329 providerId, deviceId);
330 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
334 // FIXME Temporary hack for NPE (ONOS-1171).
335 // Proper fix is to implement forwarding to master on ConfigProvider
337 if (deviceNode == null) {
343 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
344 providerId, deviceId, deviceDescription);
346 // TODO check unicast return value
347 clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
349 log.warn("Failed to process injected device id: {} desc: {} " +
350 "(cluster messaging failed: {})",
351 deviceId, deviceDescription, e);
358 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
360 Timestamped<DeviceDescription> deltaDesc) {
362 // Collection of DeviceDescriptions for a Device
363 Map<ProviderId, DeviceDescriptions> device
364 = getOrCreateDeviceDescriptionsMap(deviceId);
366 synchronized (device) {
367 // locking per device
369 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
370 log.debug("Ignoring outdated event: {}", deltaDesc);
374 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
376 final Device oldDevice = devices.get(deviceId);
377 final Device newDevice;
379 if (deltaDesc == descs.getDeviceDesc() ||
380 deltaDesc.isNewer(descs.getDeviceDesc())) {
381 // on new device or valid update
382 descs.putDeviceDesc(deltaDesc);
383 newDevice = composeDevice(deviceId, device);
385 // outdated event, ignored.
388 if (oldDevice == null) {
390 return createDevice(providerId, newDevice, deltaDesc.timestamp());
392 // UPDATE or ignore (no change or stale)
393 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
398 // Creates the device and returns the appropriate event if necessary.
399 // Guarded by deviceDescs value (=Device lock)
400 private DeviceEvent createDevice(ProviderId providerId,
401 Device newDevice, Timestamp timestamp) {
403 // update composed device cache
404 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
405 verify(oldDevice == null,
406 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
407 providerId, oldDevice, newDevice);
409 if (!providerId.isAncillary()) {
410 markOnline(newDevice.id(), timestamp);
413 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
416 // Updates the device and returns the appropriate event if necessary.
417 // Guarded by deviceDescs value (=Device lock)
418 private DeviceEvent updateDevice(ProviderId providerId,
420 Device newDevice, Timestamp newTimestamp) {
421 // We allow only certain attributes to trigger update
422 boolean propertiesChanged =
423 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
424 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
425 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
426 boolean annotationsChanged =
427 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
429 // Primary providers can respond to all changes, but ancillary ones
430 // should respond only to annotation changes.
431 if ((providerId.isAncillary() && annotationsChanged) ||
432 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
433 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
436 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
437 providerId, oldDevice, devices.get(newDevice.id())
440 if (!providerId.isAncillary()) {
441 boolean wasOnline = availableDevices.contains(newDevice.id());
442 markOnline(newDevice.id(), newTimestamp);
444 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
448 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
454 public DeviceEvent markOffline(DeviceId deviceId) {
455 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
456 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
458 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
459 deviceId, timestamp);
460 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
465 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
467 Map<ProviderId, DeviceDescriptions> providerDescs
468 = getOrCreateDeviceDescriptionsMap(deviceId);
471 synchronized (providerDescs) {
473 // accept off-line if given timestamp is newer than
474 // the latest Timestamp from Primary provider
475 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
476 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
477 if (timestamp.compareTo(lastTimestamp) <= 0) {
478 // outdated event ignore
482 offline.put(deviceId, timestamp);
484 Device device = devices.get(deviceId);
485 if (device == null) {
488 boolean removed = availableDevices.remove(deviceId);
490 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
497 * Marks the device as available if the given timestamp is not outdated,
498 * compared to the time the device has been marked offline.
500 * @param deviceId identifier of the device
501 * @param timestamp of the event triggering this change.
502 * @return true if availability change request was accepted and changed the state
504 // Guarded by deviceDescs value (=Device lock)
505 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
506 // accept on-line if given timestamp is newer than
507 // the latest offline request Timestamp
508 Timestamp offlineTimestamp = offline.get(deviceId);
509 if (offlineTimestamp == null ||
510 offlineTimestamp.compareTo(timestamp) < 0) {
512 offline.remove(deviceId);
513 return availableDevices.add(deviceId);
519 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
521 List<PortDescription> portDescriptions) {
523 NodeId localNode = clusterService.getLocalNode().id();
524 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
525 // since it will trigger distributed store read.
526 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
527 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
528 // If we don't care much about topology performance, then it might be OK.
529 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
531 // Process port update only if we're the master of the device,
532 // otherwise signal the actual master.
533 List<DeviceEvent> deviceEvents = null;
534 if (localNode.equals(deviceNode)) {
536 final Timestamp newTimestamp;
538 newTimestamp = deviceClockService.getTimestamp(deviceId);
539 } catch (IllegalStateException e) {
540 log.info("Timestamp was not available for device {}", deviceId);
541 log.debug(" discarding {}", portDescriptions);
542 // Failed to generate timestamp.
544 // Possible situation:
545 // Device connected and became master for short period of time,
546 // but lost mastership before this instance had the chance to
547 // retrieve term information.
549 // Information dropped here is expected to be recoverable by
550 // device probing after mastership change
552 return Collections.emptyList();
554 log.debug("timestamp for {} {}", deviceId, newTimestamp);
556 final Timestamped<List<PortDescription>> timestampedInput
557 = new Timestamped<>(portDescriptions, newTimestamp);
558 final Timestamped<List<PortDescription>> merged;
560 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
562 synchronized (device) {
563 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
564 final DeviceDescriptions descs = device.get(providerId);
565 List<PortDescription> mergedList =
566 FluentIterable.from(portDescriptions)
567 .transform(new Function<PortDescription, PortDescription>() {
569 public PortDescription apply(PortDescription input) {
570 // lookup merged port description
571 return descs.getPortDesc(input.portNumber()).value();
574 merged = new Timestamped<>(mergedList, newTimestamp);
577 if (!deviceEvents.isEmpty()) {
578 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
579 providerId, deviceId);
580 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
584 // FIXME Temporary hack for NPE (ONOS-1171).
585 // Proper fix is to implement forwarding to master on ConfigProvider
587 if (deviceNode == null) {
589 return Collections.emptyList();
592 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
594 //TODO check unicast return value
595 clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
597 log.warn("Failed to process injected ports of device id: {} " +
598 "(cluster messaging failed: {})",
603 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
606 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
608 Timestamped<List<PortDescription>> portDescriptions) {
610 Device device = devices.get(deviceId);
611 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
613 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
614 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
616 List<DeviceEvent> events = new ArrayList<>();
617 synchronized (descsMap) {
619 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
620 log.debug("Ignoring outdated events: {}", portDescriptions);
621 return Collections.emptyList();
624 DeviceDescriptions descs = descsMap.get(providerId);
625 // every provider must provide DeviceDescription.
626 checkArgument(descs != null,
627 "Device description for Device ID %s from Provider %s was not found",
628 deviceId, providerId);
630 Map<PortNumber, Port> ports = getPortMap(deviceId);
632 final Timestamp newTimestamp = portDescriptions.timestamp();
635 Set<PortNumber> processed = new HashSet<>();
636 for (PortDescription portDescription : portDescriptions.value()) {
637 final PortNumber number = portDescription.portNumber();
638 processed.add(number);
640 final Port oldPort = ports.get(number);
644 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
645 if (existingPortDesc == null ||
646 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
647 // on new port or valid update
648 // update description
649 descs.putPortDesc(new Timestamped<>(portDescription,
650 portDescriptions.timestamp()));
651 newPort = composePort(device, number, descsMap);
653 // outdated event, ignored.
657 events.add(oldPort == null ?
658 createPort(device, newPort, ports) :
659 updatePort(device, oldPort, newPort, ports));
662 events.addAll(pruneOldPorts(device, ports, processed));
664 return FluentIterable.from(events).filter(notNull()).toList();
667 // Creates a new port based on the port description adds it to the map and
668 // Returns corresponding event.
669 // Guarded by deviceDescs value (=Device lock)
670 private DeviceEvent createPort(Device device, Port newPort,
671 Map<PortNumber, Port> ports) {
672 ports.put(newPort.number(), newPort);
673 return new DeviceEvent(PORT_ADDED, device, newPort);
676 // Checks if the specified port requires update and if so, it replaces the
677 // existing entry in the map and returns corresponding event.
678 // Guarded by deviceDescs value (=Device lock)
679 private DeviceEvent updatePort(Device device, Port oldPort,
681 Map<PortNumber, Port> ports) {
682 if (oldPort.isEnabled() != newPort.isEnabled() ||
683 oldPort.type() != newPort.type() ||
684 oldPort.portSpeed() != newPort.portSpeed() ||
685 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
686 ports.put(oldPort.number(), newPort);
687 return new DeviceEvent(PORT_UPDATED, device, newPort);
692 // Prunes the specified list of ports based on which ports are in the
693 // processed list and returns list of corresponding events.
694 // Guarded by deviceDescs value (=Device lock)
695 private List<DeviceEvent> pruneOldPorts(Device device,
696 Map<PortNumber, Port> ports,
697 Set<PortNumber> processed) {
698 List<DeviceEvent> events = new ArrayList<>();
699 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
700 while (iterator.hasNext()) {
701 Entry<PortNumber, Port> e = iterator.next();
702 PortNumber portNumber = e.getKey();
703 if (!processed.contains(portNumber)) {
704 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
711 // Gets the map of ports for the specified device; if one does not already
712 // exist, it creates and registers a new one.
713 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
714 return createIfAbsentUnchecked(devicePorts, deviceId,
715 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
718 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
720 Map<ProviderId, DeviceDescriptions> r;
721 r = deviceDescs.get(deviceId);
724 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
725 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
726 if (concurrentlyAdded != null) {
727 r = concurrentlyAdded;
733 // Guarded by deviceDescs value (=Device lock)
734 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
735 Map<ProviderId, DeviceDescriptions> device,
736 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
737 synchronized (device) {
738 DeviceDescriptions r = device.get(providerId);
740 r = new DeviceDescriptions(deltaDesc);
741 device.put(providerId, r);
748 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
750 PortDescription portDescription) {
751 final Timestamp newTimestamp;
753 newTimestamp = deviceClockService.getTimestamp(deviceId);
754 } catch (IllegalStateException e) {
755 log.info("Timestamp was not available for device {}", deviceId);
756 log.debug(" discarding {}", portDescription);
757 // Failed to generate timestamp. Ignoring.
758 // See updatePorts comment
761 final Timestamped<PortDescription> deltaDesc
762 = new Timestamped<>(portDescription, newTimestamp);
763 final DeviceEvent event;
764 final Timestamped<PortDescription> mergedDesc;
765 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
766 synchronized (device) {
767 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
768 mergedDesc = device.get(providerId)
769 .getPortDesc(portDescription.portNumber());
772 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
773 providerId, deviceId);
774 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
779 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
780 Timestamped<PortDescription> deltaDesc) {
781 Device device = devices.get(deviceId);
782 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
784 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
785 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
787 synchronized (descsMap) {
789 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
790 log.debug("Ignoring outdated event: {}", deltaDesc);
794 DeviceDescriptions descs = descsMap.get(providerId);
795 // assuming all providers must to give DeviceDescription
796 verify(descs != null,
797 "Device description for Device ID %s from Provider %s was not found",
798 deviceId, providerId);
800 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
801 final PortNumber number = deltaDesc.value().portNumber();
802 final Port oldPort = ports.get(number);
805 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
806 if (existingPortDesc == null ||
807 deltaDesc.isNewer(existingPortDesc)) {
808 // on new port or valid update
809 // update description
810 descs.putPortDesc(deltaDesc);
811 newPort = composePort(device, number, descsMap);
813 // same or outdated event, ignored.
814 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
818 if (oldPort == null) {
819 return createPort(device, newPort, ports);
821 return updatePort(device, oldPort, newPort, ports);
827 public List<Port> getPorts(DeviceId deviceId) {
828 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
830 return Collections.emptyList();
832 return ImmutableList.copyOf(ports.values());
836 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
837 Collection<PortStatistics> newStatsCollection) {
839 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
840 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
841 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
843 if (prvStatsMap != null) {
844 for (PortStatistics newStats : newStatsCollection) {
845 PortNumber port = PortNumber.portNumber(newStats.port());
846 PortStatistics prvStats = prvStatsMap.get(port);
847 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
848 PortStatistics deltaStats = builder.build();
849 if (prvStats != null) {
850 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
852 deltaStatsMap.put(port, deltaStats);
853 newStatsMap.put(port, newStats);
856 for (PortStatistics newStats : newStatsCollection) {
857 PortNumber port = PortNumber.portNumber(newStats.port());
858 newStatsMap.put(port, newStats);
861 devicePortDeltaStats.put(deviceId, deltaStatsMap);
862 devicePortStats.put(deviceId, newStatsMap);
863 // DeviceEvent returns null because of InternalPortStatsListener usage
868 * Calculate delta statistics by subtracting previous from new statistics.
870 * @param deviceId device identifier
871 * @param prvStats previous port statistics
872 * @param newStats new port statistics
873 * @return PortStatistics
875 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
876 // calculate time difference
877 long deltaStatsSec, deltaStatsNano;
878 if (newStats.durationNano() < prvStats.durationNano()) {
879 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
880 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
882 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
883 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
885 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
886 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
887 .setPort(newStats.port())
888 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
889 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
890 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
891 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
892 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
893 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
894 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
895 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
896 .setDurationSec(deltaStatsSec)
897 .setDurationNano(deltaStatsNano)
903 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
904 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
905 if (portStats == null) {
906 return Collections.emptyList();
908 return ImmutableList.copyOf(portStats.values());
912 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
913 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
914 if (portStats == null) {
915 return Collections.emptyList();
917 return ImmutableList.copyOf(portStats.values());
921 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
922 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
923 return ports == null ? null : ports.get(portNumber);
927 public boolean isAvailable(DeviceId deviceId) {
928 return availableDevices.contains(deviceId);
932 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
933 final NodeId myId = clusterService.getLocalNode().id();
934 NodeId master = mastershipService.getMasterFor(deviceId);
936 // if there exist a master, forward
937 // if there is no master, try to become one and process
939 boolean relinquishAtEnd = false;
940 if (master == null) {
941 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
942 if (myRole != MastershipRole.NONE) {
943 relinquishAtEnd = true;
945 log.debug("Temporarily requesting role for {} to remove", deviceId);
946 mastershipService.requestRoleFor(deviceId);
947 MastershipTerm term = termService.getMastershipTerm(deviceId);
948 if (term != null && myId.equals(term.master())) {
953 if (!myId.equals(master)) {
954 log.debug("{} has control of {}, forwarding remove request",
957 // TODO check unicast return value
958 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
960 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
963 // event will be triggered after master processes it.
969 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
970 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
972 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
974 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
976 if (relinquishAtEnd) {
977 log.debug("Relinquishing temporary role acquired for {}", deviceId);
978 mastershipService.relinquishMastership(deviceId);
983 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
984 Timestamp timestamp) {
986 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
987 synchronized (descs) {
988 // accept removal request if given timestamp is newer than
989 // the latest Timestamp from Primary provider
990 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
991 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
992 if (timestamp.compareTo(lastTimestamp) <= 0) {
993 // outdated event ignore
996 removalRequest.put(deviceId, timestamp);
998 Device device = devices.remove(deviceId);
999 // should DEVICE_REMOVED carry removed ports?
1000 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1001 if (ports != null) {
1004 markOfflineInternal(deviceId, timestamp);
1006 return device == null ? null :
1007 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1012 * Checks if given timestamp is superseded by removal request
1013 * with more recent timestamp.
1015 * @param deviceId identifier of a device
1016 * @param timestampToCheck timestamp of an event to check
1017 * @return true if device is already removed
1019 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1020 Timestamp removalTimestamp = removalRequest.get(deviceId);
1021 if (removalTimestamp != null &&
1022 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1023 // removalRequest is more recent
1030 * Returns a Device, merging description given from multiple Providers.
1032 * @param deviceId device identifier
1033 * @param providerDescs Collection of Descriptions from multiple providers
1034 * @return Device instance
1036 private Device composeDevice(DeviceId deviceId,
1037 Map<ProviderId, DeviceDescriptions> providerDescs) {
1039 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1041 ProviderId primary = pickPrimaryPID(providerDescs);
1043 DeviceDescriptions desc = providerDescs.get(primary);
1045 final DeviceDescription base = desc.getDeviceDesc().value();
1046 Type type = base.type();
1047 String manufacturer = base.manufacturer();
1048 String hwVersion = base.hwVersion();
1049 String swVersion = base.swVersion();
1050 String serialNumber = base.serialNumber();
1051 ChassisId chassisId = base.chassisId();
1052 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1053 annotations = merge(annotations, base.annotations());
1055 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1056 if (e.getKey().equals(primary)) {
1059 // Note: should keep track of Description timestamp in the future
1060 // and only merge conflicting keys when timestamp is newer.
1061 // Currently assuming there will never be a key conflict between
1064 // annotation merging. not so efficient, should revisit later
1065 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1068 return new DefaultDevice(primary, deviceId, type, manufacturer,
1069 hwVersion, swVersion, serialNumber,
1070 chassisId, annotations);
1073 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1074 PortDescription description, Annotations annotations) {
1075 switch (description.type()) {
1077 OmsPortDescription omsDesc = (OmsPortDescription) description;
1078 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1079 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1081 OchPortDescription ochDesc = (OchPortDescription) description;
1082 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1083 ochDesc.isTunable(), ochDesc.lambda(), annotations);
1085 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1086 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1088 return new DefaultPort(device, number, isEnabled, description.type(),
1089 description.portSpeed(), annotations);
1094 * Returns a Port, merging description given from multiple Providers.
1096 * @param device device the port is on
1097 * @param number port number
1098 * @param descsMap Collection of Descriptions from multiple providers
1099 * @return Port instance
1101 private Port composePort(Device device, PortNumber number,
1102 Map<ProviderId, DeviceDescriptions> descsMap) {
1104 ProviderId primary = pickPrimaryPID(descsMap);
1105 DeviceDescriptions primDescs = descsMap.get(primary);
1106 // if no primary, assume not enabled
1107 boolean isEnabled = false;
1108 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1109 Timestamp newest = null;
1110 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1111 if (portDesc != null) {
1112 isEnabled = portDesc.value().isEnabled();
1113 annotations = merge(annotations, portDesc.value().annotations());
1114 newest = portDesc.timestamp();
1116 Port updated = null;
1117 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1118 if (e.getKey().equals(primary)) {
1121 // Note: should keep track of Description timestamp in the future
1122 // and only merge conflicting keys when timestamp is newer.
1123 // Currently assuming there will never be a key conflict between
1126 // annotation merging. not so efficient, should revisit later
1127 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1128 if (otherPortDesc != null) {
1129 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1132 annotations = merge(annotations, otherPortDesc.value().annotations());
1133 PortDescription other = otherPortDesc.value();
1134 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1135 newest = otherPortDesc.timestamp();
1138 if (portDesc == null) {
1139 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1141 PortDescription current = portDesc.value();
1142 return updated == null
1143 ? buildTypedPort(device, number, isEnabled, current, annotations)
1148 * @return primary ProviderID, or randomly chosen one if none exists
1150 private ProviderId pickPrimaryPID(
1151 Map<ProviderId, DeviceDescriptions> providerDescs) {
1152 ProviderId fallBackPrimary = null;
1153 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1154 if (!e.getKey().isAncillary()) {
1156 } else if (fallBackPrimary == null) {
1157 // pick randomly as a fallback in case there is no primary
1158 fallBackPrimary = e.getKey();
1161 return fallBackPrimary;
1164 private DeviceDescriptions getPrimaryDescriptions(
1165 Map<ProviderId, DeviceDescriptions> providerDescs) {
1166 ProviderId pid = pickPrimaryPID(providerDescs);
1167 return providerDescs.get(pid);
1170 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1171 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1174 private void broadcastMessage(MessageSubject subject, Object event) {
1175 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1178 private void notifyPeers(InternalDeviceEvent event) {
1179 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1182 private void notifyPeers(InternalDeviceOfflineEvent event) {
1183 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1186 private void notifyPeers(InternalDeviceRemovedEvent event) {
1187 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1190 private void notifyPeers(InternalPortEvent event) {
1191 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1194 private void notifyPeers(InternalPortStatusEvent event) {
1195 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1198 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1200 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1201 } catch (IOException e) {
1202 log.error("Failed to send" + event + " to " + recipient, e);
1206 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1208 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1209 } catch (IOException e) {
1210 log.error("Failed to send" + event + " to " + recipient, e);
1214 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1216 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1217 } catch (IOException e) {
1218 log.error("Failed to send" + event + " to " + recipient, e);
1222 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1224 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1225 } catch (IOException e) {
1226 log.error("Failed to send" + event + " to " + recipient, e);
1230 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1232 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1233 } catch (IOException e) {
1234 log.error("Failed to send" + event + " to " + recipient, e);
1238 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1239 final NodeId self = clusterService.getLocalNode().id();
1241 final int numDevices = deviceDescs.size();
1242 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1243 final int portsPerDevice = 8; // random factor to minimize reallocation
1244 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1245 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1247 deviceDescs.forEach((deviceId, devDescs) -> {
1249 // for each Device...
1250 synchronized (devDescs) {
1252 // send device offline timestamp
1253 Timestamp lOffline = this.offline.get(deviceId);
1254 if (lOffline != null) {
1255 adOffline.put(deviceId, lOffline);
1258 for (Entry<ProviderId, DeviceDescriptions>
1259 prov : devDescs.entrySet()) {
1261 // for each Provider Descriptions...
1262 final ProviderId provId = prov.getKey();
1263 final DeviceDescriptions descs = prov.getValue();
1265 adDevices.put(new DeviceFragmentId(deviceId, provId),
1266 descs.getDeviceDesc().timestamp());
1268 for (Entry<PortNumber, Timestamped<PortDescription>>
1269 portDesc : descs.getPortDescs().entrySet()) {
1271 final PortNumber number = portDesc.getKey();
1272 adPorts.put(new PortFragmentId(deviceId, provId, number),
1273 portDesc.getValue().timestamp());
1279 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1283 * Responds to anti-entropy advertisement message.
1285 * Notify sender about out-dated information using regular replication message.
1286 * Send back advertisement to sender if not in sync.
1288 * @param advertisement to respond to
1290 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1292 final NodeId sender = advertisement.sender();
1294 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1295 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1296 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1298 // Fragments to request
1299 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1300 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1302 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1303 final DeviceId deviceId = de.getKey();
1304 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1306 synchronized (lDevice) {
1307 // latestTimestamp across provider
1308 // Note: can be null initially
1309 Timestamp localLatest = offline.get(deviceId);
1311 // handle device Ads
1312 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1313 final ProviderId provId = prov.getKey();
1314 final DeviceDescriptions lDeviceDescs = prov.getValue();
1316 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1319 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1320 Timestamp advDevTimestamp = devAds.get(devFragId);
1322 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1324 // remote does not have it or outdated, suggest
1325 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1326 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1327 // local is outdated, request
1328 reqDevices.add(devFragId);
1332 for (Entry<PortNumber, Timestamped<PortDescription>>
1333 pe : lDeviceDescs.getPortDescs().entrySet()) {
1335 final PortNumber num = pe.getKey();
1336 final Timestamped<PortDescription> lPort = pe.getValue();
1338 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1340 Timestamp advPortTimestamp = portAds.get(portFragId);
1341 if (advPortTimestamp == null || lPort.isNewerThan(
1342 advPortTimestamp)) {
1343 // remote does not have it or outdated, suggest
1344 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1345 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1346 // local is outdated, request
1347 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1348 reqPorts.add(portFragId);
1351 // remove port Ad already processed
1352 portAds.remove(portFragId);
1353 } // end local port loop
1355 // remove device Ad already processed
1356 devAds.remove(devFragId);
1358 // find latest and update
1359 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1360 if (localLatest == null ||
1361 providerLatest.compareTo(localLatest) > 0) {
1362 localLatest = providerLatest;
1364 } // end local provider loop
1366 // checking if remote timestamp is more recent.
1367 Timestamp rOffline = offlineAds.get(deviceId);
1368 if (rOffline != null &&
1369 rOffline.compareTo(localLatest) > 0) {
1370 // remote offline timestamp suggests that the
1371 // device is off-line
1372 markOfflineInternal(deviceId, rOffline);
1375 Timestamp lOffline = offline.get(deviceId);
1376 if (lOffline != null && rOffline == null) {
1377 // locally offline, but remote is online, suggest offline
1378 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1381 // remove device offline Ad already processed
1382 offlineAds.remove(deviceId);
1383 } // end local device loop
1386 // If there is any Ads left, request them
1387 log.trace("Ads left {}, {}", devAds, portAds);
1388 reqDevices.addAll(devAds.keySet());
1389 reqPorts.addAll(portAds.keySet());
1391 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1392 log.trace("Nothing to request to remote peer {}", sender);
1396 log.debug("Need to sync {} {}", reqDevices, reqPorts);
1398 // 2-way Anti-Entropy for now
1400 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1401 } catch (IOException e) {
1402 log.error("Failed to send response advertisement to " + sender, e);
1405 // Sketch of 3-way Anti-Entropy
1406 // DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1407 // ClusterMessage message = new ClusterMessage(
1408 // clusterService.getLocalNode().id(),
1409 // GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1410 // SERIALIZER.encode(request));
1413 // clusterCommunicator.unicast(message, advertisement.sender());
1414 // } catch (IOException e) {
1415 // log.error("Failed to send advertisement reply to "
1416 // + advertisement.sender(), e);
1420 private void notifyDelegateIfNotNull(DeviceEvent event) {
1421 if (event != null) {
1422 notifyDelegate(event);
1426 private final class SendAdvertisementTask implements Runnable {
1430 if (Thread.currentThread().isInterrupted()) {
1431 log.debug("Interrupted, quitting");
1436 final NodeId self = clusterService.getLocalNode().id();
1437 Set<ControllerNode> nodes = clusterService.getNodes();
1439 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1440 .transform(toNodeId())
1443 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1444 log.trace("No other peers in the cluster.");
1450 int idx = RandomUtils.nextInt(0, nodeIds.size());
1451 peer = nodeIds.get(idx);
1452 } while (peer.equals(self));
1454 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1456 if (Thread.currentThread().isInterrupted()) {
1457 log.debug("Interrupted, quitting");
1462 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1463 } catch (IOException e) {
1464 log.debug("Failed to send anti-entropy advertisement to {}", peer);
1467 } catch (Exception e) {
1468 // catch all Exception to avoid Scheduled task being suppressed.
1469 log.error("Exception thrown while sending advertisement", e);
1474 private final class InternalDeviceEventListener
1475 implements ClusterMessageHandler {
1477 public void handle(ClusterMessage message) {
1478 log.debug("Received device update event from peer: {}", message.sender());
1479 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1481 ProviderId providerId = event.providerId();
1482 DeviceId deviceId = event.deviceId();
1483 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1486 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1487 } catch (Exception e) {
1488 log.warn("Exception thrown handling device update", e);
1493 private final class InternalDeviceOfflineEventListener
1494 implements ClusterMessageHandler {
1496 public void handle(ClusterMessage message) {
1497 log.debug("Received device offline event from peer: {}", message.sender());
1498 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1500 DeviceId deviceId = event.deviceId();
1501 Timestamp timestamp = event.timestamp();
1504 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1505 } catch (Exception e) {
1506 log.warn("Exception thrown handling device offline", e);
1511 private final class InternalRemoveRequestListener
1512 implements ClusterMessageHandler {
1514 public void handle(ClusterMessage message) {
1515 log.debug("Received device remove request from peer: {}", message.sender());
1516 DeviceId did = SERIALIZER.decode(message.payload());
1520 } catch (Exception e) {
1521 log.warn("Exception thrown handling device remove", e);
1526 private final class InternalDeviceRemovedEventListener
1527 implements ClusterMessageHandler {
1529 public void handle(ClusterMessage message) {
1530 log.debug("Received device removed event from peer: {}", message.sender());
1531 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1533 DeviceId deviceId = event.deviceId();
1534 Timestamp timestamp = event.timestamp();
1537 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1538 } catch (Exception e) {
1539 log.warn("Exception thrown handling device removed", e);
1544 private final class InternalPortEventListener
1545 implements ClusterMessageHandler {
1547 public void handle(ClusterMessage message) {
1549 log.debug("Received port update event from peer: {}", message.sender());
1550 InternalPortEvent event = SERIALIZER.decode(message.payload());
1552 ProviderId providerId = event.providerId();
1553 DeviceId deviceId = event.deviceId();
1554 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1556 if (getDevice(deviceId) == null) {
1557 log.debug("{} not found on this node yet, ignoring.", deviceId);
1558 // Note: dropped information will be recovered by anti-entropy
1563 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1564 } catch (Exception e) {
1565 log.warn("Exception thrown handling port update", e);
1570 private final class InternalPortStatusEventListener
1571 implements ClusterMessageHandler {
1573 public void handle(ClusterMessage message) {
1575 log.debug("Received port status update event from peer: {}", message.sender());
1576 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1578 ProviderId providerId = event.providerId();
1579 DeviceId deviceId = event.deviceId();
1580 Timestamped<PortDescription> portDescription = event.portDescription();
1582 if (getDevice(deviceId) == null) {
1583 log.debug("{} not found on this node yet, ignoring.", deviceId);
1584 // Note: dropped information will be recovered by anti-entropy
1589 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1590 } catch (Exception e) {
1591 log.warn("Exception thrown handling port update", e);
1596 private final class InternalDeviceAdvertisementListener
1597 implements ClusterMessageHandler {
1599 public void handle(ClusterMessage message) {
1600 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1601 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1603 handleAdvertisement(advertisement);
1604 } catch (Exception e) {
1605 log.warn("Exception thrown handling Device advertisements.", e);
1610 private final class DeviceInjectedEventListener
1611 implements ClusterMessageHandler {
1613 public void handle(ClusterMessage message) {
1614 log.debug("Received injected device event from peer: {}", message.sender());
1615 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1617 ProviderId providerId = event.providerId();
1618 DeviceId deviceId = event.deviceId();
1619 DeviceDescription deviceDescription = event.deviceDescription();
1620 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1621 // workaround for ONOS-1208
1622 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1627 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1628 } catch (Exception e) {
1629 log.warn("Exception thrown handling device injected event.", e);
1634 private final class PortInjectedEventListener
1635 implements ClusterMessageHandler {
1637 public void handle(ClusterMessage message) {
1638 log.debug("Received injected port event from peer: {}", message.sender());
1639 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1641 ProviderId providerId = event.providerId();
1642 DeviceId deviceId = event.deviceId();
1643 List<PortDescription> portDescriptions = event.portDescriptions();
1644 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1645 // workaround for ONOS-1208
1646 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1651 updatePorts(providerId, deviceId, portDescriptions);
1652 } catch (Exception e) {
1653 log.warn("Exception thrown handling port injected event.", e);
1658 private class InternalPortStatsListener
1659 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1661 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1662 if (event.type() == PUT) {
1663 Device device = devices.get(event.key());
1664 if (device != null) {
1665 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));