2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.device.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
23 import org.apache.commons.lang3.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onlab.packet.ChassisId;
31 import org.onlab.util.KryoNamespace;
32 import org.onlab.util.NewConcurrentHashMap;
33 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.mastership.MastershipTerm;
38 import org.onosproject.mastership.MastershipTermService;
39 import org.onosproject.net.Annotations;
40 import org.onosproject.net.AnnotationsUtil;
41 import org.onosproject.net.DefaultAnnotations;
42 import org.onosproject.net.DefaultDevice;
43 import org.onosproject.net.DefaultPort;
44 import org.onosproject.net.Device;
45 import org.onosproject.net.Device.Type;
46 import org.onosproject.net.DeviceId;
47 import org.onosproject.net.MastershipRole;
48 import org.onosproject.net.OchPort;
49 import org.onosproject.net.OduCltPort;
50 import org.onosproject.net.OmsPort;
51 import org.onosproject.net.Port;
52 import org.onosproject.net.PortNumber;
53 import org.onosproject.net.device.DefaultPortStatistics;
54 import org.onosproject.net.device.DeviceClockService;
55 import org.onosproject.net.device.DeviceDescription;
56 import org.onosproject.net.device.DeviceEvent;
57 import org.onosproject.net.device.DeviceStore;
58 import org.onosproject.net.device.DeviceStoreDelegate;
59 import org.onosproject.net.device.OchPortDescription;
60 import org.onosproject.net.device.OduCltPortDescription;
61 import org.onosproject.net.device.OmsPortDescription;
62 import org.onosproject.net.device.PortDescription;
63 import org.onosproject.net.device.PortStatistics;
64 import org.onosproject.net.provider.ProviderId;
65 import org.onosproject.store.AbstractStore;
66 import org.onosproject.store.Timestamp;
67 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
68 import org.onosproject.store.cluster.messaging.ClusterMessage;
69 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
70 import org.onosproject.store.cluster.messaging.MessageSubject;
71 import org.onosproject.store.impl.Timestamped;
72 import org.onosproject.store.serializers.KryoNamespaces;
73 import org.onosproject.store.serializers.KryoSerializer;
74 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 import org.onosproject.store.service.EventuallyConsistentMap;
76 import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 import org.onosproject.store.service.EventuallyConsistentMapListener;
78 import org.onosproject.store.service.MultiValuedTimestamp;
79 import org.onosproject.store.service.StorageService;
80 import org.onosproject.store.service.WallClockTimestamp;
81 import org.slf4j.Logger;
83 import java.io.IOException;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.List;
92 import java.util.Map.Entry;
93 import java.util.Objects;
95 import java.util.concurrent.ConcurrentMap;
96 import java.util.concurrent.ExecutorService;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledExecutorService;
99 import java.util.concurrent.TimeUnit;
101 import static com.google.common.base.Preconditions.checkArgument;
102 import static com.google.common.base.Predicates.notNull;
103 import static com.google.common.base.Verify.verify;
104 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
105 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
106 import static org.onlab.util.Tools.groupedThreads;
107 import static org.onlab.util.Tools.minPriority;
108 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
109 import static org.onosproject.net.DefaultAnnotations.merge;
110 import static org.onosproject.net.device.DeviceEvent.Type.*;
111 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
112 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
113 import static org.slf4j.LoggerFactory.getLogger;
116 * Manages inventory of infrastructure devices using gossip protocol to distribute
119 @Component(immediate = true)
121 public class GossipDeviceStore
122 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
123 implements DeviceStore {
125 private final Logger log = getLogger(getClass());
127 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
128 // Timeout in milliseconds to process device or ports on remote master node
129 private static final int REMOTE_MASTER_TIMEOUT = 1000;
131 // innerMap is used to lock a Device, thus instance should never be replaced.
132 // collection of Description given from various providers
133 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
134 deviceDescs = Maps.newConcurrentMap();
136 // cache of Device and Ports generated by compositing descriptions from providers
137 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
138 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
140 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
141 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
142 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
143 portStatsListener = new InternalPortStatsListener();
145 // to be updated under Device lock
146 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
147 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
149 // available(=UP) devices
150 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
153 protected DeviceClockService deviceClockService;
155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
156 protected StorageService storageService;
158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
159 protected ClusterCommunicationService clusterCommunicator;
161 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
162 protected ClusterService clusterService;
164 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
165 protected MastershipService mastershipService;
167 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168 protected MastershipTermService termService;
171 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
173 protected void setupKryoPool() {
174 serializerPool = KryoNamespace.newBuilder()
175 .register(DistributedStoreSerializers.STORE_COMMON)
176 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
177 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
178 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
179 .register(InternalDeviceRemovedEvent.class)
180 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
181 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
182 .register(DeviceAntiEntropyAdvertisement.class)
183 .register(DeviceFragmentId.class)
184 .register(PortFragmentId.class)
185 .register(DeviceInjectedEvent.class)
186 .register(PortInjectedEvent.class)
191 private ExecutorService executor;
193 private ScheduledExecutorService backgroundExecutor;
195 // TODO make these anti-entropy parameters configurable
196 private long initialDelaySec = 5;
197 private long periodSec = 5;
200 public void activate() {
201 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
204 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
206 clusterCommunicator.addSubscriber(
207 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
208 clusterCommunicator.addSubscriber(
209 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
210 new InternalDeviceOfflineEventListener(),
212 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
213 new InternalRemoveRequestListener(),
215 clusterCommunicator.addSubscriber(
216 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
217 clusterCommunicator.addSubscriber(
218 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
219 clusterCommunicator.addSubscriber(
220 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
221 clusterCommunicator.addSubscriber(
222 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
223 new InternalDeviceAdvertisementListener(),
225 clusterCommunicator.addSubscriber(
226 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
227 clusterCommunicator.addSubscriber(
228 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
230 // start anti-entropy thread
231 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
232 initialDelaySec, periodSec, TimeUnit.SECONDS);
234 // Create a distributed map for port stats.
235 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
236 .register(KryoNamespaces.API)
237 .register(DefaultPortStatistics.class)
238 .register(DeviceId.class)
239 .register(MultiValuedTimestamp.class)
240 .register(WallClockTimestamp.class);
242 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
243 .withName("port-stats")
244 .withSerializer(deviceDataSerializer)
245 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247 .withTombstonesDisabled()
249 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
250 eventuallyConsistentMapBuilder()
251 .withName("port-stats-delta")
252 .withSerializer(deviceDataSerializer)
253 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
254 .withTimestampProvider((k, v) -> new WallClockTimestamp())
255 .withTombstonesDisabled()
257 devicePortStats.addListener(portStatsListener);
262 public void deactivate() {
263 devicePortStats.destroy();
264 devicePortDeltaStats.destroy();
265 executor.shutdownNow();
267 backgroundExecutor.shutdownNow();
269 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
270 log.error("Timeout during executor shutdown");
272 } catch (InterruptedException e) {
273 log.error("Error during executor shutdown", e);
279 availableDevices.clear();
284 public int getDeviceCount() {
285 return devices.size();
289 public Iterable<Device> getDevices() {
290 return Collections.unmodifiableCollection(devices.values());
294 public Iterable<Device> getAvailableDevices() {
295 return FluentIterable.from(getDevices())
296 .filter(input -> isAvailable(input.id()));
300 public Device getDevice(DeviceId deviceId) {
301 return devices.get(deviceId);
305 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
307 DeviceDescription deviceDescription) {
308 NodeId localNode = clusterService.getLocalNode().id();
309 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
311 // Process device update only if we're the master,
312 // otherwise signal the actual master.
313 DeviceEvent deviceEvent = null;
314 if (localNode.equals(deviceNode)) {
316 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
317 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
318 final Timestamped<DeviceDescription> mergedDesc;
319 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
321 synchronized (device) {
322 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
323 mergedDesc = device.get(providerId).getDeviceDesc();
326 if (deviceEvent != null) {
327 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
328 providerId, deviceId);
329 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
333 // FIXME Temporary hack for NPE (ONOS-1171).
334 // Proper fix is to implement forwarding to master on ConfigProvider
336 if (deviceNode == null) {
342 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
343 providerId, deviceId, deviceDescription);
345 // TODO check unicast return value
346 clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
348 log.warn("Failed to process injected device id: {} desc: {} " +
349 "(cluster messaging failed: {})",
350 deviceId, deviceDescription, e);
357 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
359 Timestamped<DeviceDescription> deltaDesc) {
361 // Collection of DeviceDescriptions for a Device
362 Map<ProviderId, DeviceDescriptions> device
363 = getOrCreateDeviceDescriptionsMap(deviceId);
365 synchronized (device) {
366 // locking per device
368 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
369 log.debug("Ignoring outdated event: {}", deltaDesc);
373 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
375 final Device oldDevice = devices.get(deviceId);
376 final Device newDevice;
378 if (deltaDesc == descs.getDeviceDesc() ||
379 deltaDesc.isNewer(descs.getDeviceDesc())) {
380 // on new device or valid update
381 descs.putDeviceDesc(deltaDesc);
382 newDevice = composeDevice(deviceId, device);
384 // outdated event, ignored.
387 if (oldDevice == null) {
389 return createDevice(providerId, newDevice, deltaDesc.timestamp());
391 // UPDATE or ignore (no change or stale)
392 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
397 // Creates the device and returns the appropriate event if necessary.
398 // Guarded by deviceDescs value (=Device lock)
399 private DeviceEvent createDevice(ProviderId providerId,
400 Device newDevice, Timestamp timestamp) {
402 // update composed device cache
403 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
404 verify(oldDevice == null,
405 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
406 providerId, oldDevice, newDevice);
408 if (!providerId.isAncillary()) {
409 markOnline(newDevice.id(), timestamp);
412 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
415 // Updates the device and returns the appropriate event if necessary.
416 // Guarded by deviceDescs value (=Device lock)
417 private DeviceEvent updateDevice(ProviderId providerId,
419 Device newDevice, Timestamp newTimestamp) {
420 // We allow only certain attributes to trigger update
421 boolean propertiesChanged =
422 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
423 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
424 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
425 boolean annotationsChanged =
426 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
428 // Primary providers can respond to all changes, but ancillary ones
429 // should respond only to annotation changes.
430 DeviceEvent event = null;
431 if ((providerId.isAncillary() && annotationsChanged) ||
432 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
433 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
436 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
437 providerId, oldDevice, devices.get(newDevice.id())
441 event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
444 if (!providerId.isAncillary()) {
445 boolean wasOnline = availableDevices.contains(newDevice.id());
446 markOnline(newDevice.id(), newTimestamp);
448 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
455 public DeviceEvent markOffline(DeviceId deviceId) {
456 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
457 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
459 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
460 deviceId, timestamp);
461 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
466 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
468 Map<ProviderId, DeviceDescriptions> providerDescs
469 = getOrCreateDeviceDescriptionsMap(deviceId);
472 synchronized (providerDescs) {
474 // accept off-line if given timestamp is newer than
475 // the latest Timestamp from Primary provider
476 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
477 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
478 if (timestamp.compareTo(lastTimestamp) <= 0) {
479 // outdated event ignore
483 offline.put(deviceId, timestamp);
485 Device device = devices.get(deviceId);
486 if (device == null) {
489 boolean removed = availableDevices.remove(deviceId);
491 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
498 * Marks the device as available if the given timestamp is not outdated,
499 * compared to the time the device has been marked offline.
501 * @param deviceId identifier of the device
502 * @param timestamp of the event triggering this change.
503 * @return true if availability change request was accepted and changed the state
505 // Guarded by deviceDescs value (=Device lock)
506 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
507 // accept on-line if given timestamp is newer than
508 // the latest offline request Timestamp
509 Timestamp offlineTimestamp = offline.get(deviceId);
510 if (offlineTimestamp == null ||
511 offlineTimestamp.compareTo(timestamp) < 0) {
513 offline.remove(deviceId);
514 return availableDevices.add(deviceId);
520 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
522 List<PortDescription> portDescriptions) {
524 NodeId localNode = clusterService.getLocalNode().id();
525 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
526 // since it will trigger distributed store read.
527 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
528 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
529 // If we don't care much about topology performance, then it might be OK.
530 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
532 // Process port update only if we're the master of the device,
533 // otherwise signal the actual master.
534 List<DeviceEvent> deviceEvents = null;
535 if (localNode.equals(deviceNode)) {
537 final Timestamp newTimestamp;
539 newTimestamp = deviceClockService.getTimestamp(deviceId);
540 } catch (IllegalStateException e) {
541 log.info("Timestamp was not available for device {}", deviceId);
542 log.debug(" discarding {}", portDescriptions);
543 // Failed to generate timestamp.
545 // Possible situation:
546 // Device connected and became master for short period of time,
547 // but lost mastership before this instance had the chance to
548 // retrieve term information.
550 // Information dropped here is expected to be recoverable by
551 // device probing after mastership change
553 return Collections.emptyList();
555 log.debug("timestamp for {} {}", deviceId, newTimestamp);
557 final Timestamped<List<PortDescription>> timestampedInput
558 = new Timestamped<>(portDescriptions, newTimestamp);
559 final Timestamped<List<PortDescription>> merged;
561 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
563 synchronized (device) {
564 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
565 final DeviceDescriptions descs = device.get(providerId);
566 List<PortDescription> mergedList =
567 FluentIterable.from(portDescriptions)
569 // lookup merged port description
570 descs.getPortDesc(input.portNumber()).value()
572 merged = new Timestamped<>(mergedList, newTimestamp);
575 if (!deviceEvents.isEmpty()) {
576 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
577 providerId, deviceId);
578 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
582 // FIXME Temporary hack for NPE (ONOS-1171).
583 // Proper fix is to implement forwarding to master on ConfigProvider
585 if (deviceNode == null) {
587 return Collections.emptyList();
590 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
592 //TODO check unicast return value
593 clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
595 log.warn("Failed to process injected ports of device id: {} " +
596 "(cluster messaging failed: {})",
601 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
604 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
606 Timestamped<List<PortDescription>> portDescriptions) {
608 Device device = devices.get(deviceId);
609 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
611 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
612 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
614 List<DeviceEvent> events = new ArrayList<>();
615 synchronized (descsMap) {
617 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
618 log.debug("Ignoring outdated events: {}", portDescriptions);
619 return Collections.emptyList();
622 DeviceDescriptions descs = descsMap.get(providerId);
623 // every provider must provide DeviceDescription.
624 checkArgument(descs != null,
625 "Device description for Device ID %s from Provider %s was not found",
626 deviceId, providerId);
628 Map<PortNumber, Port> ports = getPortMap(deviceId);
630 final Timestamp newTimestamp = portDescriptions.timestamp();
633 Set<PortNumber> processed = new HashSet<>();
634 for (PortDescription portDescription : portDescriptions.value()) {
635 final PortNumber number = portDescription.portNumber();
636 processed.add(number);
638 final Port oldPort = ports.get(number);
642 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
643 if (existingPortDesc == null ||
644 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
645 // on new port or valid update
646 // update description
647 descs.putPortDesc(new Timestamped<>(portDescription,
648 portDescriptions.timestamp()));
649 newPort = composePort(device, number, descsMap);
651 // outdated event, ignored.
655 events.add(oldPort == null ?
656 createPort(device, newPort, ports) :
657 updatePort(device, oldPort, newPort, ports));
660 events.addAll(pruneOldPorts(device, ports, processed));
662 return FluentIterable.from(events).filter(notNull()).toList();
665 // Creates a new port based on the port description adds it to the map and
666 // Returns corresponding event.
667 // Guarded by deviceDescs value (=Device lock)
668 private DeviceEvent createPort(Device device, Port newPort,
669 Map<PortNumber, Port> ports) {
670 ports.put(newPort.number(), newPort);
671 return new DeviceEvent(PORT_ADDED, device, newPort);
674 // Checks if the specified port requires update and if so, it replaces the
675 // existing entry in the map and returns corresponding event.
676 // Guarded by deviceDescs value (=Device lock)
677 private DeviceEvent updatePort(Device device, Port oldPort,
679 Map<PortNumber, Port> ports) {
680 if (oldPort.isEnabled() != newPort.isEnabled() ||
681 oldPort.type() != newPort.type() ||
682 oldPort.portSpeed() != newPort.portSpeed() ||
683 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
684 ports.put(oldPort.number(), newPort);
685 return new DeviceEvent(PORT_UPDATED, device, newPort);
690 // Prunes the specified list of ports based on which ports are in the
691 // processed list and returns list of corresponding events.
692 // Guarded by deviceDescs value (=Device lock)
693 private List<DeviceEvent> pruneOldPorts(Device device,
694 Map<PortNumber, Port> ports,
695 Set<PortNumber> processed) {
696 List<DeviceEvent> events = new ArrayList<>();
697 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
698 while (iterator.hasNext()) {
699 Entry<PortNumber, Port> e = iterator.next();
700 PortNumber portNumber = e.getKey();
701 if (!processed.contains(portNumber)) {
702 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
709 // Gets the map of ports for the specified device; if one does not already
710 // exist, it creates and registers a new one.
711 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
712 return createIfAbsentUnchecked(devicePorts, deviceId,
713 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
716 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
718 Map<ProviderId, DeviceDescriptions> r;
719 r = deviceDescs.get(deviceId);
722 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
723 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
724 if (concurrentlyAdded != null) {
725 r = concurrentlyAdded;
731 // Guarded by deviceDescs value (=Device lock)
732 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
733 Map<ProviderId, DeviceDescriptions> device,
734 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
735 synchronized (device) {
736 DeviceDescriptions r = device.get(providerId);
738 r = new DeviceDescriptions(deltaDesc);
739 device.put(providerId, r);
746 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
748 PortDescription portDescription) {
749 final Timestamp newTimestamp;
751 newTimestamp = deviceClockService.getTimestamp(deviceId);
752 } catch (IllegalStateException e) {
753 log.info("Timestamp was not available for device {}", deviceId);
754 log.debug(" discarding {}", portDescription);
755 // Failed to generate timestamp. Ignoring.
756 // See updatePorts comment
759 final Timestamped<PortDescription> deltaDesc
760 = new Timestamped<>(portDescription, newTimestamp);
761 final DeviceEvent event;
762 final Timestamped<PortDescription> mergedDesc;
763 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
764 synchronized (device) {
765 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
766 mergedDesc = device.get(providerId)
767 .getPortDesc(portDescription.portNumber());
770 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
771 providerId, deviceId);
772 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
777 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
778 Timestamped<PortDescription> deltaDesc) {
779 Device device = devices.get(deviceId);
780 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
782 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
783 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
785 synchronized (descsMap) {
787 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
788 log.debug("Ignoring outdated event: {}", deltaDesc);
792 DeviceDescriptions descs = descsMap.get(providerId);
793 // assuming all providers must to give DeviceDescription
794 verify(descs != null,
795 "Device description for Device ID %s from Provider %s was not found",
796 deviceId, providerId);
798 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
799 final PortNumber number = deltaDesc.value().portNumber();
800 final Port oldPort = ports.get(number);
803 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
804 if (existingPortDesc == null ||
805 deltaDesc.isNewer(existingPortDesc)) {
806 // on new port or valid update
807 // update description
808 descs.putPortDesc(deltaDesc);
809 newPort = composePort(device, number, descsMap);
811 // same or outdated event, ignored.
812 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
816 if (oldPort == null) {
817 return createPort(device, newPort, ports);
819 return updatePort(device, oldPort, newPort, ports);
825 public List<Port> getPorts(DeviceId deviceId) {
826 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
828 return Collections.emptyList();
830 return ImmutableList.copyOf(ports.values());
834 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
835 Collection<PortStatistics> newStatsCollection) {
837 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
838 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
839 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
841 if (prvStatsMap != null) {
842 for (PortStatistics newStats : newStatsCollection) {
843 PortNumber port = PortNumber.portNumber(newStats.port());
844 PortStatistics prvStats = prvStatsMap.get(port);
845 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
846 PortStatistics deltaStats = builder.build();
847 if (prvStats != null) {
848 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
850 deltaStatsMap.put(port, deltaStats);
851 newStatsMap.put(port, newStats);
854 for (PortStatistics newStats : newStatsCollection) {
855 PortNumber port = PortNumber.portNumber(newStats.port());
856 newStatsMap.put(port, newStats);
859 devicePortDeltaStats.put(deviceId, deltaStatsMap);
860 devicePortStats.put(deviceId, newStatsMap);
861 // DeviceEvent returns null because of InternalPortStatsListener usage
866 * Calculate delta statistics by subtracting previous from new statistics.
868 * @param deviceId device identifier
869 * @param prvStats previous port statistics
870 * @param newStats new port statistics
871 * @return PortStatistics
873 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
874 // calculate time difference
875 long deltaStatsSec, deltaStatsNano;
876 if (newStats.durationNano() < prvStats.durationNano()) {
877 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
878 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
880 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
881 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
883 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
884 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
885 .setPort(newStats.port())
886 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
887 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
888 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
889 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
890 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
891 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
892 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
893 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
894 .setDurationSec(deltaStatsSec)
895 .setDurationNano(deltaStatsNano)
901 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
902 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
903 if (portStats == null) {
904 return Collections.emptyList();
906 return ImmutableList.copyOf(portStats.values());
910 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
911 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
912 if (portStats == null) {
913 return Collections.emptyList();
915 return ImmutableList.copyOf(portStats.values());
919 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
920 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
921 return ports == null ? null : ports.get(portNumber);
925 public boolean isAvailable(DeviceId deviceId) {
926 return availableDevices.contains(deviceId);
930 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
931 final NodeId myId = clusterService.getLocalNode().id();
932 NodeId master = mastershipService.getMasterFor(deviceId);
934 // if there exist a master, forward
935 // if there is no master, try to become one and process
937 boolean relinquishAtEnd = false;
938 if (master == null) {
939 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
940 if (myRole != MastershipRole.NONE) {
941 relinquishAtEnd = true;
943 log.debug("Temporarily requesting role for {} to remove", deviceId);
944 mastershipService.requestRoleFor(deviceId);
945 MastershipTerm term = termService.getMastershipTerm(deviceId);
946 if (term != null && myId.equals(term.master())) {
951 if (!myId.equals(master)) {
952 log.debug("{} has control of {}, forwarding remove request",
955 // TODO check unicast return value
956 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
958 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
961 // event will be triggered after master processes it.
967 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
968 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
970 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
972 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
974 if (relinquishAtEnd) {
975 log.debug("Relinquishing temporary role acquired for {}", deviceId);
976 mastershipService.relinquishMastership(deviceId);
981 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
982 Timestamp timestamp) {
984 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
985 synchronized (descs) {
986 // accept removal request if given timestamp is newer than
987 // the latest Timestamp from Primary provider
988 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
989 if (primDescs == null) {
993 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
994 if (timestamp.compareTo(lastTimestamp) <= 0) {
995 // outdated event ignore
998 removalRequest.put(deviceId, timestamp);
1000 Device device = devices.remove(deviceId);
1001 // should DEVICE_REMOVED carry removed ports?
1002 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1003 if (ports != null) {
1006 markOfflineInternal(deviceId, timestamp);
1008 return device == null ? null :
1009 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1014 * Checks if given timestamp is superseded by removal request
1015 * with more recent timestamp.
1017 * @param deviceId identifier of a device
1018 * @param timestampToCheck timestamp of an event to check
1019 * @return true if device is already removed
1021 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1022 Timestamp removalTimestamp = removalRequest.get(deviceId);
1023 if (removalTimestamp != null &&
1024 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1025 // removalRequest is more recent
1032 * Returns a Device, merging description given from multiple Providers.
1034 * @param deviceId device identifier
1035 * @param providerDescs Collection of Descriptions from multiple providers
1036 * @return Device instance
1038 private Device composeDevice(DeviceId deviceId,
1039 Map<ProviderId, DeviceDescriptions> providerDescs) {
1041 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1043 ProviderId primary = pickPrimaryPID(providerDescs);
1045 DeviceDescriptions desc = providerDescs.get(primary);
1047 final DeviceDescription base = desc.getDeviceDesc().value();
1048 Type type = base.type();
1049 String manufacturer = base.manufacturer();
1050 String hwVersion = base.hwVersion();
1051 String swVersion = base.swVersion();
1052 String serialNumber = base.serialNumber();
1053 ChassisId chassisId = base.chassisId();
1054 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1055 annotations = merge(annotations, base.annotations());
1057 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1058 if (e.getKey().equals(primary)) {
1061 // Note: should keep track of Description timestamp in the future
1062 // and only merge conflicting keys when timestamp is newer.
1063 // Currently assuming there will never be a key conflict between
1066 // annotation merging. not so efficient, should revisit later
1067 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1070 return new DefaultDevice(primary, deviceId, type, manufacturer,
1071 hwVersion, swVersion, serialNumber,
1072 chassisId, annotations);
1075 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1076 PortDescription description, Annotations annotations) {
1077 switch (description.type()) {
1079 OmsPortDescription omsDesc = (OmsPortDescription) description;
1080 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1081 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1083 OchPortDescription ochDesc = (OchPortDescription) description;
1084 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1085 ochDesc.isTunable(), ochDesc.lambda(), annotations);
1087 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1088 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1090 return new DefaultPort(device, number, isEnabled, description.type(),
1091 description.portSpeed(), annotations);
1096 * Returns a Port, merging description given from multiple Providers.
1098 * @param device device the port is on
1099 * @param number port number
1100 * @param descsMap Collection of Descriptions from multiple providers
1101 * @return Port instance
1103 private Port composePort(Device device, PortNumber number,
1104 Map<ProviderId, DeviceDescriptions> descsMap) {
1106 ProviderId primary = pickPrimaryPID(descsMap);
1107 DeviceDescriptions primDescs = descsMap.get(primary);
1108 // if no primary, assume not enabled
1109 boolean isEnabled = false;
1110 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1111 Timestamp newest = null;
1112 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1113 if (portDesc != null) {
1114 isEnabled = portDesc.value().isEnabled();
1115 annotations = merge(annotations, portDesc.value().annotations());
1116 newest = portDesc.timestamp();
1118 Port updated = null;
1119 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1120 if (e.getKey().equals(primary)) {
1123 // Note: should keep track of Description timestamp in the future
1124 // and only merge conflicting keys when timestamp is newer.
1125 // Currently assuming there will never be a key conflict between
1128 // annotation merging. not so efficient, should revisit later
1129 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1130 if (otherPortDesc != null) {
1131 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1134 annotations = merge(annotations, otherPortDesc.value().annotations());
1135 PortDescription other = otherPortDesc.value();
1136 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1137 newest = otherPortDesc.timestamp();
1140 if (portDesc == null) {
1141 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1143 PortDescription current = portDesc.value();
1144 return updated == null
1145 ? buildTypedPort(device, number, isEnabled, current, annotations)
1150 * @return primary ProviderID, or randomly chosen one if none exists
1152 private ProviderId pickPrimaryPID(
1153 Map<ProviderId, DeviceDescriptions> providerDescs) {
1154 ProviderId fallBackPrimary = null;
1155 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1156 if (!e.getKey().isAncillary()) {
1158 } else if (fallBackPrimary == null) {
1159 // pick randomly as a fallback in case there is no primary
1160 fallBackPrimary = e.getKey();
1163 return fallBackPrimary;
1166 private DeviceDescriptions getPrimaryDescriptions(
1167 Map<ProviderId, DeviceDescriptions> providerDescs) {
1168 ProviderId pid = pickPrimaryPID(providerDescs);
1169 return providerDescs.get(pid);
1172 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1173 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1176 private void broadcastMessage(MessageSubject subject, Object event) {
1177 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1180 private void notifyPeers(InternalDeviceEvent event) {
1181 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1184 private void notifyPeers(InternalDeviceOfflineEvent event) {
1185 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1188 private void notifyPeers(InternalDeviceRemovedEvent event) {
1189 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1192 private void notifyPeers(InternalPortEvent event) {
1193 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1196 private void notifyPeers(InternalPortStatusEvent event) {
1197 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1200 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1202 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1203 } catch (IOException e) {
1204 log.error("Failed to send" + event + " to " + recipient, e);
1208 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1210 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1211 } catch (IOException e) {
1212 log.error("Failed to send" + event + " to " + recipient, e);
1216 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1218 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1219 } catch (IOException e) {
1220 log.error("Failed to send" + event + " to " + recipient, e);
1224 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1226 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1227 } catch (IOException e) {
1228 log.error("Failed to send" + event + " to " + recipient, e);
1232 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1234 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1235 } catch (IOException e) {
1236 log.error("Failed to send" + event + " to " + recipient, e);
1240 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1241 final NodeId self = clusterService.getLocalNode().id();
1243 final int numDevices = deviceDescs.size();
1244 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1245 final int portsPerDevice = 8; // random factor to minimize reallocation
1246 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1247 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1249 deviceDescs.forEach((deviceId, devDescs) -> {
1251 // for each Device...
1252 synchronized (devDescs) {
1254 // send device offline timestamp
1255 Timestamp lOffline = this.offline.get(deviceId);
1256 if (lOffline != null) {
1257 adOffline.put(deviceId, lOffline);
1260 for (Entry<ProviderId, DeviceDescriptions>
1261 prov : devDescs.entrySet()) {
1263 // for each Provider Descriptions...
1264 final ProviderId provId = prov.getKey();
1265 final DeviceDescriptions descs = prov.getValue();
1267 adDevices.put(new DeviceFragmentId(deviceId, provId),
1268 descs.getDeviceDesc().timestamp());
1270 for (Entry<PortNumber, Timestamped<PortDescription>>
1271 portDesc : descs.getPortDescs().entrySet()) {
1273 final PortNumber number = portDesc.getKey();
1274 adPorts.put(new PortFragmentId(deviceId, provId, number),
1275 portDesc.getValue().timestamp());
1281 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1285 * Responds to anti-entropy advertisement message.
1287 * Notify sender about out-dated information using regular replication message.
1288 * Send back advertisement to sender if not in sync.
1290 * @param advertisement to respond to
1292 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1294 final NodeId sender = advertisement.sender();
1296 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1297 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1298 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1300 // Fragments to request
1301 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1302 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1304 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1305 final DeviceId deviceId = de.getKey();
1306 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1308 synchronized (lDevice) {
1309 // latestTimestamp across provider
1310 // Note: can be null initially
1311 Timestamp localLatest = offline.get(deviceId);
1313 // handle device Ads
1314 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1315 final ProviderId provId = prov.getKey();
1316 final DeviceDescriptions lDeviceDescs = prov.getValue();
1318 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1321 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1322 Timestamp advDevTimestamp = devAds.get(devFragId);
1324 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1326 // remote does not have it or outdated, suggest
1327 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1328 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1329 // local is outdated, request
1330 reqDevices.add(devFragId);
1334 for (Entry<PortNumber, Timestamped<PortDescription>>
1335 pe : lDeviceDescs.getPortDescs().entrySet()) {
1337 final PortNumber num = pe.getKey();
1338 final Timestamped<PortDescription> lPort = pe.getValue();
1340 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1342 Timestamp advPortTimestamp = portAds.get(portFragId);
1343 if (advPortTimestamp == null || lPort.isNewerThan(
1344 advPortTimestamp)) {
1345 // remote does not have it or outdated, suggest
1346 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1347 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1348 // local is outdated, request
1349 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1350 reqPorts.add(portFragId);
1353 // remove port Ad already processed
1354 portAds.remove(portFragId);
1355 } // end local port loop
1357 // remove device Ad already processed
1358 devAds.remove(devFragId);
1360 // find latest and update
1361 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1362 if (localLatest == null ||
1363 providerLatest.compareTo(localLatest) > 0) {
1364 localLatest = providerLatest;
1366 } // end local provider loop
1368 // checking if remote timestamp is more recent.
1369 Timestamp rOffline = offlineAds.get(deviceId);
1370 if (rOffline != null &&
1371 rOffline.compareTo(localLatest) > 0) {
1372 // remote offline timestamp suggests that the
1373 // device is off-line
1374 markOfflineInternal(deviceId, rOffline);
1377 Timestamp lOffline = offline.get(deviceId);
1378 if (lOffline != null && rOffline == null) {
1379 // locally offline, but remote is online, suggest offline
1380 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1383 // remove device offline Ad already processed
1384 offlineAds.remove(deviceId);
1385 } // end local device loop
1388 // If there is any Ads left, request them
1389 log.trace("Ads left {}, {}", devAds, portAds);
1390 reqDevices.addAll(devAds.keySet());
1391 reqPorts.addAll(portAds.keySet());
1393 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1394 log.trace("Nothing to request to remote peer {}", sender);
1398 log.debug("Need to sync {} {}", reqDevices, reqPorts);
1400 // 2-way Anti-Entropy for now
1402 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1403 } catch (IOException e) {
1404 log.error("Failed to send response advertisement to " + sender, e);
1407 // Sketch of 3-way Anti-Entropy
1408 // DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1409 // ClusterMessage message = new ClusterMessage(
1410 // clusterService.getLocalNode().id(),
1411 // GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1412 // SERIALIZER.encode(request));
1415 // clusterCommunicator.unicast(message, advertisement.sender());
1416 // } catch (IOException e) {
1417 // log.error("Failed to send advertisement reply to "
1418 // + advertisement.sender(), e);
1422 private void notifyDelegateIfNotNull(DeviceEvent event) {
1423 if (event != null) {
1424 notifyDelegate(event);
1428 private final class SendAdvertisementTask implements Runnable {
1432 if (Thread.currentThread().isInterrupted()) {
1433 log.debug("Interrupted, quitting");
1438 final NodeId self = clusterService.getLocalNode().id();
1439 Set<ControllerNode> nodes = clusterService.getNodes();
1441 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1442 .transform(toNodeId())
1445 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1446 log.trace("No other peers in the cluster.");
1452 int idx = RandomUtils.nextInt(0, nodeIds.size());
1453 peer = nodeIds.get(idx);
1454 } while (peer.equals(self));
1456 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1458 if (Thread.currentThread().isInterrupted()) {
1459 log.debug("Interrupted, quitting");
1464 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1465 } catch (IOException e) {
1466 log.debug("Failed to send anti-entropy advertisement to {}", peer);
1469 } catch (Exception e) {
1470 // catch all Exception to avoid Scheduled task being suppressed.
1471 log.error("Exception thrown while sending advertisement", e);
1476 private final class InternalDeviceEventListener
1477 implements ClusterMessageHandler {
1479 public void handle(ClusterMessage message) {
1480 log.debug("Received device update event from peer: {}", message.sender());
1481 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1483 ProviderId providerId = event.providerId();
1484 DeviceId deviceId = event.deviceId();
1485 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1488 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1489 } catch (Exception e) {
1490 log.warn("Exception thrown handling device update", e);
1495 private final class InternalDeviceOfflineEventListener
1496 implements ClusterMessageHandler {
1498 public void handle(ClusterMessage message) {
1499 log.debug("Received device offline event from peer: {}", message.sender());
1500 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1502 DeviceId deviceId = event.deviceId();
1503 Timestamp timestamp = event.timestamp();
1506 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1507 } catch (Exception e) {
1508 log.warn("Exception thrown handling device offline", e);
1513 private final class InternalRemoveRequestListener
1514 implements ClusterMessageHandler {
1516 public void handle(ClusterMessage message) {
1517 log.debug("Received device remove request from peer: {}", message.sender());
1518 DeviceId did = SERIALIZER.decode(message.payload());
1522 } catch (Exception e) {
1523 log.warn("Exception thrown handling device remove", e);
1528 private final class InternalDeviceRemovedEventListener
1529 implements ClusterMessageHandler {
1531 public void handle(ClusterMessage message) {
1532 log.debug("Received device removed event from peer: {}", message.sender());
1533 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1535 DeviceId deviceId = event.deviceId();
1536 Timestamp timestamp = event.timestamp();
1539 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1540 } catch (Exception e) {
1541 log.warn("Exception thrown handling device removed", e);
1546 private final class InternalPortEventListener
1547 implements ClusterMessageHandler {
1549 public void handle(ClusterMessage message) {
1551 log.debug("Received port update event from peer: {}", message.sender());
1552 InternalPortEvent event = SERIALIZER.decode(message.payload());
1554 ProviderId providerId = event.providerId();
1555 DeviceId deviceId = event.deviceId();
1556 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1558 if (getDevice(deviceId) == null) {
1559 log.debug("{} not found on this node yet, ignoring.", deviceId);
1560 // Note: dropped information will be recovered by anti-entropy
1565 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1566 } catch (Exception e) {
1567 log.warn("Exception thrown handling port update", e);
1572 private final class InternalPortStatusEventListener
1573 implements ClusterMessageHandler {
1575 public void handle(ClusterMessage message) {
1577 log.debug("Received port status update event from peer: {}", message.sender());
1578 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1580 ProviderId providerId = event.providerId();
1581 DeviceId deviceId = event.deviceId();
1582 Timestamped<PortDescription> portDescription = event.portDescription();
1584 if (getDevice(deviceId) == null) {
1585 log.debug("{} not found on this node yet, ignoring.", deviceId);
1586 // Note: dropped information will be recovered by anti-entropy
1591 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1592 } catch (Exception e) {
1593 log.warn("Exception thrown handling port update", e);
1598 private final class InternalDeviceAdvertisementListener
1599 implements ClusterMessageHandler {
1601 public void handle(ClusterMessage message) {
1602 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1603 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1605 handleAdvertisement(advertisement);
1606 } catch (Exception e) {
1607 log.warn("Exception thrown handling Device advertisements.", e);
1612 private final class DeviceInjectedEventListener
1613 implements ClusterMessageHandler {
1615 public void handle(ClusterMessage message) {
1616 log.debug("Received injected device event from peer: {}", message.sender());
1617 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1619 ProviderId providerId = event.providerId();
1620 DeviceId deviceId = event.deviceId();
1621 DeviceDescription deviceDescription = event.deviceDescription();
1622 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1623 // workaround for ONOS-1208
1624 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1629 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1630 } catch (Exception e) {
1631 log.warn("Exception thrown handling device injected event.", e);
1636 private final class PortInjectedEventListener
1637 implements ClusterMessageHandler {
1639 public void handle(ClusterMessage message) {
1640 log.debug("Received injected port event from peer: {}", message.sender());
1641 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1643 ProviderId providerId = event.providerId();
1644 DeviceId deviceId = event.deviceId();
1645 List<PortDescription> portDescriptions = event.portDescriptions();
1646 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1647 // workaround for ONOS-1208
1648 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1653 updatePorts(providerId, deviceId, portDescriptions);
1654 } catch (Exception e) {
1655 log.warn("Exception thrown handling port injected event.", e);
1660 private class InternalPortStatsListener
1661 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1663 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1664 if (event.type() == PUT) {
1665 Device device = devices.get(event.key());
1666 if (device != null) {
1667 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));