2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.device.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
23 import org.apache.commons.lang3.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onlab.packet.ChassisId;
31 import org.onlab.util.KryoNamespace;
32 import org.onlab.util.NewConcurrentHashMap;
33 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.mastership.MastershipTerm;
38 import org.onosproject.mastership.MastershipTermService;
39 import org.onosproject.net.Annotations;
40 import org.onosproject.net.AnnotationsUtil;
41 import org.onosproject.net.DefaultAnnotations;
42 import org.onosproject.net.DefaultDevice;
43 import org.onosproject.net.DefaultPort;
44 import org.onosproject.net.Device;
45 import org.onosproject.net.Device.Type;
46 import org.onosproject.net.DeviceId;
47 import org.onosproject.net.MastershipRole;
48 import org.onosproject.net.OchPort;
49 import org.onosproject.net.OduCltPort;
50 import org.onosproject.net.OmsPort;
51 import org.onosproject.net.Port;
52 import org.onosproject.net.PortNumber;
53 import org.onosproject.net.device.DefaultPortStatistics;
54 import org.onosproject.net.device.DeviceClockService;
55 import org.onosproject.net.device.DeviceDescription;
56 import org.onosproject.net.device.DeviceEvent;
57 import org.onosproject.net.device.DeviceStore;
58 import org.onosproject.net.device.DeviceStoreDelegate;
59 import org.onosproject.net.device.OchPortDescription;
60 import org.onosproject.net.device.OduCltPortDescription;
61 import org.onosproject.net.device.OmsPortDescription;
62 import org.onosproject.net.device.PortDescription;
63 import org.onosproject.net.device.PortStatistics;
64 import org.onosproject.net.provider.ProviderId;
65 import org.onosproject.store.AbstractStore;
66 import org.onosproject.store.Timestamp;
67 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
68 import org.onosproject.store.cluster.messaging.ClusterMessage;
69 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
70 import org.onosproject.store.cluster.messaging.MessageSubject;
71 import org.onosproject.store.impl.Timestamped;
72 import org.onosproject.store.serializers.KryoNamespaces;
73 import org.onosproject.store.serializers.KryoSerializer;
74 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 import org.onosproject.store.service.EventuallyConsistentMap;
76 import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 import org.onosproject.store.service.EventuallyConsistentMapListener;
78 import org.onosproject.store.service.MultiValuedTimestamp;
79 import org.onosproject.store.service.StorageService;
80 import org.onosproject.store.service.WallClockTimestamp;
81 import org.slf4j.Logger;
83 import java.io.IOException;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.List;
92 import java.util.Map.Entry;
93 import java.util.Objects;
95 import java.util.concurrent.ConcurrentMap;
96 import java.util.concurrent.ExecutorService;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledExecutorService;
99 import java.util.concurrent.TimeUnit;
101 import static com.google.common.base.Preconditions.checkArgument;
102 import static com.google.common.base.Predicates.notNull;
103 import static com.google.common.base.Verify.verify;
104 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
105 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
106 import static org.onlab.util.Tools.groupedThreads;
107 import static org.onlab.util.Tools.minPriority;
108 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
109 import static org.onosproject.net.DefaultAnnotations.merge;
110 import static org.onosproject.net.device.DeviceEvent.Type.*;
111 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
112 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
113 import static org.slf4j.LoggerFactory.getLogger;
116 * Manages inventory of infrastructure devices using gossip protocol to distribute
119 @Component(immediate = true)
121 public class GossipDeviceStore
122 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
123 implements DeviceStore {
125 private final Logger log = getLogger(getClass());
127 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
128 // Timeout in milliseconds to process device or ports on remote master node
129 private static final int REMOTE_MASTER_TIMEOUT = 1000;
131 // innerMap is used to lock a Device, thus instance should never be replaced.
132 // collection of Description given from various providers
133 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
134 deviceDescs = Maps.newConcurrentMap();
136 // cache of Device and Ports generated by compositing descriptions from providers
137 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
138 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
140 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
141 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
142 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
143 portStatsListener = new InternalPortStatsListener();
145 // to be updated under Device lock
146 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
147 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
149 // available(=UP) devices
150 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
153 protected DeviceClockService deviceClockService;
155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
156 protected StorageService storageService;
158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
159 protected ClusterCommunicationService clusterCommunicator;
161 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
162 protected ClusterService clusterService;
164 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
165 protected MastershipService mastershipService;
167 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168 protected MastershipTermService termService;
171 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
173 protected void setupKryoPool() {
174 serializerPool = KryoNamespace.newBuilder()
175 .register(DistributedStoreSerializers.STORE_COMMON)
176 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
177 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
178 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
179 .register(InternalDeviceRemovedEvent.class)
180 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
181 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
182 .register(DeviceAntiEntropyAdvertisement.class)
183 .register(DeviceFragmentId.class)
184 .register(PortFragmentId.class)
185 .register(DeviceInjectedEvent.class)
186 .register(PortInjectedEvent.class)
191 private ExecutorService executor;
193 private ScheduledExecutorService backgroundExecutor;
195 // TODO make these anti-entropy parameters configurable
196 private long initialDelaySec = 5;
197 private long periodSec = 5;
200 public void activate() {
201 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
204 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
206 clusterCommunicator.addSubscriber(
207 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
208 clusterCommunicator.addSubscriber(
209 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
210 new InternalDeviceOfflineEventListener(),
212 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
213 new InternalRemoveRequestListener(),
215 clusterCommunicator.addSubscriber(
216 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
217 clusterCommunicator.addSubscriber(
218 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
219 clusterCommunicator.addSubscriber(
220 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
221 clusterCommunicator.addSubscriber(
222 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
223 new InternalDeviceAdvertisementListener(),
225 clusterCommunicator.addSubscriber(
226 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
227 clusterCommunicator.addSubscriber(
228 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
230 // start anti-entropy thread
231 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
232 initialDelaySec, periodSec, TimeUnit.SECONDS);
234 // Create a distributed map for port stats.
235 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
236 .register(KryoNamespaces.API)
237 .register(DefaultPortStatistics.class)
238 .register(DeviceId.class)
239 .register(MultiValuedTimestamp.class)
240 .register(WallClockTimestamp.class);
242 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
243 .withName("port-stats")
244 .withSerializer(deviceDataSerializer)
245 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247 .withTombstonesDisabled()
249 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
250 eventuallyConsistentMapBuilder()
251 .withName("port-stats-delta")
252 .withSerializer(deviceDataSerializer)
253 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
254 .withTimestampProvider((k, v) -> new WallClockTimestamp())
255 .withTombstonesDisabled()
257 devicePortStats.addListener(portStatsListener);
262 public void deactivate() {
263 devicePortStats.destroy();
264 devicePortDeltaStats.destroy();
265 executor.shutdownNow();
267 backgroundExecutor.shutdownNow();
269 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
270 log.error("Timeout during executor shutdown");
272 } catch (InterruptedException e) {
273 log.error("Error during executor shutdown", e);
279 availableDevices.clear();
284 public int getDeviceCount() {
285 return devices.size();
289 public Iterable<Device> getDevices() {
290 return Collections.unmodifiableCollection(devices.values());
294 public Iterable<Device> getAvailableDevices() {
295 return FluentIterable.from(getDevices())
296 .filter(input -> isAvailable(input.id()));
300 public Device getDevice(DeviceId deviceId) {
301 return devices.get(deviceId);
305 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
307 DeviceDescription deviceDescription) {
308 NodeId localNode = clusterService.getLocalNode().id();
309 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
311 // Process device update only if we're the master,
312 // otherwise signal the actual master.
313 DeviceEvent deviceEvent = null;
314 if (localNode.equals(deviceNode)) {
316 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
317 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
318 final Timestamped<DeviceDescription> mergedDesc;
319 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
321 synchronized (device) {
322 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
323 mergedDesc = device.get(providerId).getDeviceDesc();
326 if (deviceEvent != null) {
327 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
328 providerId, deviceId);
329 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
333 // FIXME Temporary hack for NPE (ONOS-1171).
334 // Proper fix is to implement forwarding to master on ConfigProvider
336 if (deviceNode == null) {
342 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
343 providerId, deviceId, deviceDescription);
345 // TODO check unicast return value
346 clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
348 log.warn("Failed to process injected device id: {} desc: {} " +
349 "(cluster messaging failed: {})",
350 deviceId, deviceDescription, e);
357 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
359 Timestamped<DeviceDescription> deltaDesc) {
361 // Collection of DeviceDescriptions for a Device
362 Map<ProviderId, DeviceDescriptions> device
363 = getOrCreateDeviceDescriptionsMap(deviceId);
365 synchronized (device) {
366 // locking per device
368 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
369 log.debug("Ignoring outdated event: {}", deltaDesc);
373 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
375 final Device oldDevice = devices.get(deviceId);
376 final Device newDevice;
378 if (deltaDesc == descs.getDeviceDesc() ||
379 deltaDesc.isNewer(descs.getDeviceDesc())) {
380 // on new device or valid update
381 descs.putDeviceDesc(deltaDesc);
382 newDevice = composeDevice(deviceId, device);
384 // outdated event, ignored.
387 if (oldDevice == null) {
389 return createDevice(providerId, newDevice, deltaDesc.timestamp());
391 // UPDATE or ignore (no change or stale)
392 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
397 // Creates the device and returns the appropriate event if necessary.
398 // Guarded by deviceDescs value (=Device lock)
399 private DeviceEvent createDevice(ProviderId providerId,
400 Device newDevice, Timestamp timestamp) {
402 // update composed device cache
403 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
404 verify(oldDevice == null,
405 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
406 providerId, oldDevice, newDevice);
408 if (!providerId.isAncillary()) {
409 markOnline(newDevice.id(), timestamp);
412 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
415 // Updates the device and returns the appropriate event if necessary.
416 // Guarded by deviceDescs value (=Device lock)
417 private DeviceEvent updateDevice(ProviderId providerId,
419 Device newDevice, Timestamp newTimestamp) {
420 // We allow only certain attributes to trigger update
421 boolean propertiesChanged =
422 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
423 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
424 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
425 boolean annotationsChanged =
426 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
428 // Primary providers can respond to all changes, but ancillary ones
429 // should respond only to annotation changes.
430 if ((providerId.isAncillary() && annotationsChanged) ||
431 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
432 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
435 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
436 providerId, oldDevice, devices.get(newDevice.id())
439 if (!providerId.isAncillary()) {
440 boolean wasOnline = availableDevices.contains(newDevice.id());
441 markOnline(newDevice.id(), newTimestamp);
443 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
447 return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
453 public DeviceEvent markOffline(DeviceId deviceId) {
454 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
455 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
457 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
458 deviceId, timestamp);
459 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
464 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
466 Map<ProviderId, DeviceDescriptions> providerDescs
467 = getOrCreateDeviceDescriptionsMap(deviceId);
470 synchronized (providerDescs) {
472 // accept off-line if given timestamp is newer than
473 // the latest Timestamp from Primary provider
474 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
475 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
476 if (timestamp.compareTo(lastTimestamp) <= 0) {
477 // outdated event ignore
481 offline.put(deviceId, timestamp);
483 Device device = devices.get(deviceId);
484 if (device == null) {
487 boolean removed = availableDevices.remove(deviceId);
489 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
496 * Marks the device as available if the given timestamp is not outdated,
497 * compared to the time the device has been marked offline.
499 * @param deviceId identifier of the device
500 * @param timestamp of the event triggering this change.
501 * @return true if availability change request was accepted and changed the state
503 // Guarded by deviceDescs value (=Device lock)
504 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
505 // accept on-line if given timestamp is newer than
506 // the latest offline request Timestamp
507 Timestamp offlineTimestamp = offline.get(deviceId);
508 if (offlineTimestamp == null ||
509 offlineTimestamp.compareTo(timestamp) < 0) {
511 offline.remove(deviceId);
512 return availableDevices.add(deviceId);
518 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
520 List<PortDescription> portDescriptions) {
522 NodeId localNode = clusterService.getLocalNode().id();
523 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
524 // since it will trigger distributed store read.
525 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
526 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
527 // If we don't care much about topology performance, then it might be OK.
528 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
530 // Process port update only if we're the master of the device,
531 // otherwise signal the actual master.
532 List<DeviceEvent> deviceEvents = null;
533 if (localNode.equals(deviceNode)) {
535 final Timestamp newTimestamp;
537 newTimestamp = deviceClockService.getTimestamp(deviceId);
538 } catch (IllegalStateException e) {
539 log.info("Timestamp was not available for device {}", deviceId);
540 log.debug(" discarding {}", portDescriptions);
541 // Failed to generate timestamp.
543 // Possible situation:
544 // Device connected and became master for short period of time,
545 // but lost mastership before this instance had the chance to
546 // retrieve term information.
548 // Information dropped here is expected to be recoverable by
549 // device probing after mastership change
551 return Collections.emptyList();
553 log.debug("timestamp for {} {}", deviceId, newTimestamp);
555 final Timestamped<List<PortDescription>> timestampedInput
556 = new Timestamped<>(portDescriptions, newTimestamp);
557 final Timestamped<List<PortDescription>> merged;
559 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
561 synchronized (device) {
562 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
563 final DeviceDescriptions descs = device.get(providerId);
564 List<PortDescription> mergedList =
565 FluentIterable.from(portDescriptions)
567 // lookup merged port description
568 descs.getPortDesc(input.portNumber()).value()
570 merged = new Timestamped<>(mergedList, newTimestamp);
573 if (!deviceEvents.isEmpty()) {
574 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
575 providerId, deviceId);
576 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
580 // FIXME Temporary hack for NPE (ONOS-1171).
581 // Proper fix is to implement forwarding to master on ConfigProvider
583 if (deviceNode == null) {
585 return Collections.emptyList();
588 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
590 //TODO check unicast return value
591 clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
593 log.warn("Failed to process injected ports of device id: {} " +
594 "(cluster messaging failed: {})",
599 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
602 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
604 Timestamped<List<PortDescription>> portDescriptions) {
606 Device device = devices.get(deviceId);
607 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
609 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
610 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
612 List<DeviceEvent> events = new ArrayList<>();
613 synchronized (descsMap) {
615 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
616 log.debug("Ignoring outdated events: {}", portDescriptions);
617 return Collections.emptyList();
620 DeviceDescriptions descs = descsMap.get(providerId);
621 // every provider must provide DeviceDescription.
622 checkArgument(descs != null,
623 "Device description for Device ID %s from Provider %s was not found",
624 deviceId, providerId);
626 Map<PortNumber, Port> ports = getPortMap(deviceId);
628 final Timestamp newTimestamp = portDescriptions.timestamp();
631 Set<PortNumber> processed = new HashSet<>();
632 for (PortDescription portDescription : portDescriptions.value()) {
633 final PortNumber number = portDescription.portNumber();
634 processed.add(number);
636 final Port oldPort = ports.get(number);
640 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
641 if (existingPortDesc == null ||
642 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
643 // on new port or valid update
644 // update description
645 descs.putPortDesc(new Timestamped<>(portDescription,
646 portDescriptions.timestamp()));
647 newPort = composePort(device, number, descsMap);
649 // outdated event, ignored.
653 events.add(oldPort == null ?
654 createPort(device, newPort, ports) :
655 updatePort(device, oldPort, newPort, ports));
658 events.addAll(pruneOldPorts(device, ports, processed));
660 return FluentIterable.from(events).filter(notNull()).toList();
663 // Creates a new port based on the port description adds it to the map and
664 // Returns corresponding event.
665 // Guarded by deviceDescs value (=Device lock)
666 private DeviceEvent createPort(Device device, Port newPort,
667 Map<PortNumber, Port> ports) {
668 ports.put(newPort.number(), newPort);
669 return new DeviceEvent(PORT_ADDED, device, newPort);
672 // Checks if the specified port requires update and if so, it replaces the
673 // existing entry in the map and returns corresponding event.
674 // Guarded by deviceDescs value (=Device lock)
675 private DeviceEvent updatePort(Device device, Port oldPort,
677 Map<PortNumber, Port> ports) {
678 if (oldPort.isEnabled() != newPort.isEnabled() ||
679 oldPort.type() != newPort.type() ||
680 oldPort.portSpeed() != newPort.portSpeed() ||
681 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
682 ports.put(oldPort.number(), newPort);
683 return new DeviceEvent(PORT_UPDATED, device, newPort);
688 // Prunes the specified list of ports based on which ports are in the
689 // processed list and returns list of corresponding events.
690 // Guarded by deviceDescs value (=Device lock)
691 private List<DeviceEvent> pruneOldPorts(Device device,
692 Map<PortNumber, Port> ports,
693 Set<PortNumber> processed) {
694 List<DeviceEvent> events = new ArrayList<>();
695 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
696 while (iterator.hasNext()) {
697 Entry<PortNumber, Port> e = iterator.next();
698 PortNumber portNumber = e.getKey();
699 if (!processed.contains(portNumber)) {
700 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
707 // Gets the map of ports for the specified device; if one does not already
708 // exist, it creates and registers a new one.
709 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
710 return createIfAbsentUnchecked(devicePorts, deviceId,
711 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
714 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
716 Map<ProviderId, DeviceDescriptions> r;
717 r = deviceDescs.get(deviceId);
720 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
721 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
722 if (concurrentlyAdded != null) {
723 r = concurrentlyAdded;
729 // Guarded by deviceDescs value (=Device lock)
730 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
731 Map<ProviderId, DeviceDescriptions> device,
732 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
733 synchronized (device) {
734 DeviceDescriptions r = device.get(providerId);
736 r = new DeviceDescriptions(deltaDesc);
737 device.put(providerId, r);
744 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
746 PortDescription portDescription) {
747 final Timestamp newTimestamp;
749 newTimestamp = deviceClockService.getTimestamp(deviceId);
750 } catch (IllegalStateException e) {
751 log.info("Timestamp was not available for device {}", deviceId);
752 log.debug(" discarding {}", portDescription);
753 // Failed to generate timestamp. Ignoring.
754 // See updatePorts comment
757 final Timestamped<PortDescription> deltaDesc
758 = new Timestamped<>(portDescription, newTimestamp);
759 final DeviceEvent event;
760 final Timestamped<PortDescription> mergedDesc;
761 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
762 synchronized (device) {
763 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
764 mergedDesc = device.get(providerId)
765 .getPortDesc(portDescription.portNumber());
768 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
769 providerId, deviceId);
770 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
775 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
776 Timestamped<PortDescription> deltaDesc) {
777 Device device = devices.get(deviceId);
778 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
780 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
781 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
783 synchronized (descsMap) {
785 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
786 log.debug("Ignoring outdated event: {}", deltaDesc);
790 DeviceDescriptions descs = descsMap.get(providerId);
791 // assuming all providers must to give DeviceDescription
792 verify(descs != null,
793 "Device description for Device ID %s from Provider %s was not found",
794 deviceId, providerId);
796 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
797 final PortNumber number = deltaDesc.value().portNumber();
798 final Port oldPort = ports.get(number);
801 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
802 if (existingPortDesc == null ||
803 deltaDesc.isNewer(existingPortDesc)) {
804 // on new port or valid update
805 // update description
806 descs.putPortDesc(deltaDesc);
807 newPort = composePort(device, number, descsMap);
809 // same or outdated event, ignored.
810 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
814 if (oldPort == null) {
815 return createPort(device, newPort, ports);
817 return updatePort(device, oldPort, newPort, ports);
823 public List<Port> getPorts(DeviceId deviceId) {
824 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
826 return Collections.emptyList();
828 return ImmutableList.copyOf(ports.values());
832 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
833 Collection<PortStatistics> newStatsCollection) {
835 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
836 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
837 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
839 if (prvStatsMap != null) {
840 for (PortStatistics newStats : newStatsCollection) {
841 PortNumber port = PortNumber.portNumber(newStats.port());
842 PortStatistics prvStats = prvStatsMap.get(port);
843 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
844 PortStatistics deltaStats = builder.build();
845 if (prvStats != null) {
846 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
848 deltaStatsMap.put(port, deltaStats);
849 newStatsMap.put(port, newStats);
852 for (PortStatistics newStats : newStatsCollection) {
853 PortNumber port = PortNumber.portNumber(newStats.port());
854 newStatsMap.put(port, newStats);
857 devicePortDeltaStats.put(deviceId, deltaStatsMap);
858 devicePortStats.put(deviceId, newStatsMap);
859 // DeviceEvent returns null because of InternalPortStatsListener usage
864 * Calculate delta statistics by subtracting previous from new statistics.
866 * @param deviceId device identifier
867 * @param prvStats previous port statistics
868 * @param newStats new port statistics
869 * @return PortStatistics
871 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
872 // calculate time difference
873 long deltaStatsSec, deltaStatsNano;
874 if (newStats.durationNano() < prvStats.durationNano()) {
875 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
876 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
878 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
879 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
881 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
882 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
883 .setPort(newStats.port())
884 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
885 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
886 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
887 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
888 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
889 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
890 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
891 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
892 .setDurationSec(deltaStatsSec)
893 .setDurationNano(deltaStatsNano)
899 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
900 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
901 if (portStats == null) {
902 return Collections.emptyList();
904 return ImmutableList.copyOf(portStats.values());
908 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
909 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
910 if (portStats == null) {
911 return Collections.emptyList();
913 return ImmutableList.copyOf(portStats.values());
917 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
918 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
919 return ports == null ? null : ports.get(portNumber);
923 public boolean isAvailable(DeviceId deviceId) {
924 return availableDevices.contains(deviceId);
928 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
929 final NodeId myId = clusterService.getLocalNode().id();
930 NodeId master = mastershipService.getMasterFor(deviceId);
932 // if there exist a master, forward
933 // if there is no master, try to become one and process
935 boolean relinquishAtEnd = false;
936 if (master == null) {
937 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
938 if (myRole != MastershipRole.NONE) {
939 relinquishAtEnd = true;
941 log.debug("Temporarily requesting role for {} to remove", deviceId);
942 mastershipService.requestRoleFor(deviceId);
943 MastershipTerm term = termService.getMastershipTerm(deviceId);
944 if (term != null && myId.equals(term.master())) {
949 if (!myId.equals(master)) {
950 log.debug("{} has control of {}, forwarding remove request",
953 // TODO check unicast return value
954 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
956 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
959 // event will be triggered after master processes it.
965 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
966 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
968 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
970 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
972 if (relinquishAtEnd) {
973 log.debug("Relinquishing temporary role acquired for {}", deviceId);
974 mastershipService.relinquishMastership(deviceId);
979 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
980 Timestamp timestamp) {
982 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
983 synchronized (descs) {
984 // accept removal request if given timestamp is newer than
985 // the latest Timestamp from Primary provider
986 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
987 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
988 if (timestamp.compareTo(lastTimestamp) <= 0) {
989 // outdated event ignore
992 removalRequest.put(deviceId, timestamp);
994 Device device = devices.remove(deviceId);
995 // should DEVICE_REMOVED carry removed ports?
996 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1000 markOfflineInternal(deviceId, timestamp);
1002 return device == null ? null :
1003 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1008 * Checks if given timestamp is superseded by removal request
1009 * with more recent timestamp.
1011 * @param deviceId identifier of a device
1012 * @param timestampToCheck timestamp of an event to check
1013 * @return true if device is already removed
1015 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1016 Timestamp removalTimestamp = removalRequest.get(deviceId);
1017 if (removalTimestamp != null &&
1018 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1019 // removalRequest is more recent
1026 * Returns a Device, merging description given from multiple Providers.
1028 * @param deviceId device identifier
1029 * @param providerDescs Collection of Descriptions from multiple providers
1030 * @return Device instance
1032 private Device composeDevice(DeviceId deviceId,
1033 Map<ProviderId, DeviceDescriptions> providerDescs) {
1035 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1037 ProviderId primary = pickPrimaryPID(providerDescs);
1039 DeviceDescriptions desc = providerDescs.get(primary);
1041 final DeviceDescription base = desc.getDeviceDesc().value();
1042 Type type = base.type();
1043 String manufacturer = base.manufacturer();
1044 String hwVersion = base.hwVersion();
1045 String swVersion = base.swVersion();
1046 String serialNumber = base.serialNumber();
1047 ChassisId chassisId = base.chassisId();
1048 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1049 annotations = merge(annotations, base.annotations());
1051 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1052 if (e.getKey().equals(primary)) {
1055 // Note: should keep track of Description timestamp in the future
1056 // and only merge conflicting keys when timestamp is newer.
1057 // Currently assuming there will never be a key conflict between
1060 // annotation merging. not so efficient, should revisit later
1061 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1064 return new DefaultDevice(primary, deviceId, type, manufacturer,
1065 hwVersion, swVersion, serialNumber,
1066 chassisId, annotations);
1069 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1070 PortDescription description, Annotations annotations) {
1071 switch (description.type()) {
1073 OmsPortDescription omsDesc = (OmsPortDescription) description;
1074 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1075 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1077 OchPortDescription ochDesc = (OchPortDescription) description;
1078 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1079 ochDesc.isTunable(), ochDesc.lambda(), annotations);
1081 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1082 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1084 return new DefaultPort(device, number, isEnabled, description.type(),
1085 description.portSpeed(), annotations);
1090 * Returns a Port, merging description given from multiple Providers.
1092 * @param device device the port is on
1093 * @param number port number
1094 * @param descsMap Collection of Descriptions from multiple providers
1095 * @return Port instance
1097 private Port composePort(Device device, PortNumber number,
1098 Map<ProviderId, DeviceDescriptions> descsMap) {
1100 ProviderId primary = pickPrimaryPID(descsMap);
1101 DeviceDescriptions primDescs = descsMap.get(primary);
1102 // if no primary, assume not enabled
1103 boolean isEnabled = false;
1104 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1105 Timestamp newest = null;
1106 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1107 if (portDesc != null) {
1108 isEnabled = portDesc.value().isEnabled();
1109 annotations = merge(annotations, portDesc.value().annotations());
1110 newest = portDesc.timestamp();
1112 Port updated = null;
1113 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1114 if (e.getKey().equals(primary)) {
1117 // Note: should keep track of Description timestamp in the future
1118 // and only merge conflicting keys when timestamp is newer.
1119 // Currently assuming there will never be a key conflict between
1122 // annotation merging. not so efficient, should revisit later
1123 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1124 if (otherPortDesc != null) {
1125 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1128 annotations = merge(annotations, otherPortDesc.value().annotations());
1129 PortDescription other = otherPortDesc.value();
1130 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1131 newest = otherPortDesc.timestamp();
1134 if (portDesc == null) {
1135 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1137 PortDescription current = portDesc.value();
1138 return updated == null
1139 ? buildTypedPort(device, number, isEnabled, current, annotations)
1144 * @return primary ProviderID, or randomly chosen one if none exists
1146 private ProviderId pickPrimaryPID(
1147 Map<ProviderId, DeviceDescriptions> providerDescs) {
1148 ProviderId fallBackPrimary = null;
1149 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1150 if (!e.getKey().isAncillary()) {
1152 } else if (fallBackPrimary == null) {
1153 // pick randomly as a fallback in case there is no primary
1154 fallBackPrimary = e.getKey();
1157 return fallBackPrimary;
1160 private DeviceDescriptions getPrimaryDescriptions(
1161 Map<ProviderId, DeviceDescriptions> providerDescs) {
1162 ProviderId pid = pickPrimaryPID(providerDescs);
1163 return providerDescs.get(pid);
1166 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1167 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1170 private void broadcastMessage(MessageSubject subject, Object event) {
1171 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1174 private void notifyPeers(InternalDeviceEvent event) {
1175 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1178 private void notifyPeers(InternalDeviceOfflineEvent event) {
1179 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1182 private void notifyPeers(InternalDeviceRemovedEvent event) {
1183 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1186 private void notifyPeers(InternalPortEvent event) {
1187 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1190 private void notifyPeers(InternalPortStatusEvent event) {
1191 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1194 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1196 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1197 } catch (IOException e) {
1198 log.error("Failed to send" + event + " to " + recipient, e);
1202 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1204 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1205 } catch (IOException e) {
1206 log.error("Failed to send" + event + " to " + recipient, e);
1210 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1212 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1213 } catch (IOException e) {
1214 log.error("Failed to send" + event + " to " + recipient, e);
1218 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1220 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1221 } catch (IOException e) {
1222 log.error("Failed to send" + event + " to " + recipient, e);
1226 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1228 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1229 } catch (IOException e) {
1230 log.error("Failed to send" + event + " to " + recipient, e);
1234 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1235 final NodeId self = clusterService.getLocalNode().id();
1237 final int numDevices = deviceDescs.size();
1238 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1239 final int portsPerDevice = 8; // random factor to minimize reallocation
1240 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1241 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1243 deviceDescs.forEach((deviceId, devDescs) -> {
1245 // for each Device...
1246 synchronized (devDescs) {
1248 // send device offline timestamp
1249 Timestamp lOffline = this.offline.get(deviceId);
1250 if (lOffline != null) {
1251 adOffline.put(deviceId, lOffline);
1254 for (Entry<ProviderId, DeviceDescriptions>
1255 prov : devDescs.entrySet()) {
1257 // for each Provider Descriptions...
1258 final ProviderId provId = prov.getKey();
1259 final DeviceDescriptions descs = prov.getValue();
1261 adDevices.put(new DeviceFragmentId(deviceId, provId),
1262 descs.getDeviceDesc().timestamp());
1264 for (Entry<PortNumber, Timestamped<PortDescription>>
1265 portDesc : descs.getPortDescs().entrySet()) {
1267 final PortNumber number = portDesc.getKey();
1268 adPorts.put(new PortFragmentId(deviceId, provId, number),
1269 portDesc.getValue().timestamp());
1275 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1279 * Responds to anti-entropy advertisement message.
1281 * Notify sender about out-dated information using regular replication message.
1282 * Send back advertisement to sender if not in sync.
1284 * @param advertisement to respond to
1286 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1288 final NodeId sender = advertisement.sender();
1290 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1291 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1292 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1294 // Fragments to request
1295 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1296 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1298 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1299 final DeviceId deviceId = de.getKey();
1300 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1302 synchronized (lDevice) {
1303 // latestTimestamp across provider
1304 // Note: can be null initially
1305 Timestamp localLatest = offline.get(deviceId);
1307 // handle device Ads
1308 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1309 final ProviderId provId = prov.getKey();
1310 final DeviceDescriptions lDeviceDescs = prov.getValue();
1312 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1315 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1316 Timestamp advDevTimestamp = devAds.get(devFragId);
1318 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1320 // remote does not have it or outdated, suggest
1321 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1322 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1323 // local is outdated, request
1324 reqDevices.add(devFragId);
1328 for (Entry<PortNumber, Timestamped<PortDescription>>
1329 pe : lDeviceDescs.getPortDescs().entrySet()) {
1331 final PortNumber num = pe.getKey();
1332 final Timestamped<PortDescription> lPort = pe.getValue();
1334 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1336 Timestamp advPortTimestamp = portAds.get(portFragId);
1337 if (advPortTimestamp == null || lPort.isNewerThan(
1338 advPortTimestamp)) {
1339 // remote does not have it or outdated, suggest
1340 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1341 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1342 // local is outdated, request
1343 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1344 reqPorts.add(portFragId);
1347 // remove port Ad already processed
1348 portAds.remove(portFragId);
1349 } // end local port loop
1351 // remove device Ad already processed
1352 devAds.remove(devFragId);
1354 // find latest and update
1355 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1356 if (localLatest == null ||
1357 providerLatest.compareTo(localLatest) > 0) {
1358 localLatest = providerLatest;
1360 } // end local provider loop
1362 // checking if remote timestamp is more recent.
1363 Timestamp rOffline = offlineAds.get(deviceId);
1364 if (rOffline != null &&
1365 rOffline.compareTo(localLatest) > 0) {
1366 // remote offline timestamp suggests that the
1367 // device is off-line
1368 markOfflineInternal(deviceId, rOffline);
1371 Timestamp lOffline = offline.get(deviceId);
1372 if (lOffline != null && rOffline == null) {
1373 // locally offline, but remote is online, suggest offline
1374 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1377 // remove device offline Ad already processed
1378 offlineAds.remove(deviceId);
1379 } // end local device loop
1382 // If there is any Ads left, request them
1383 log.trace("Ads left {}, {}", devAds, portAds);
1384 reqDevices.addAll(devAds.keySet());
1385 reqPorts.addAll(portAds.keySet());
1387 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1388 log.trace("Nothing to request to remote peer {}", sender);
1392 log.debug("Need to sync {} {}", reqDevices, reqPorts);
1394 // 2-way Anti-Entropy for now
1396 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1397 } catch (IOException e) {
1398 log.error("Failed to send response advertisement to " + sender, e);
1401 // Sketch of 3-way Anti-Entropy
1402 // DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1403 // ClusterMessage message = new ClusterMessage(
1404 // clusterService.getLocalNode().id(),
1405 // GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1406 // SERIALIZER.encode(request));
1409 // clusterCommunicator.unicast(message, advertisement.sender());
1410 // } catch (IOException e) {
1411 // log.error("Failed to send advertisement reply to "
1412 // + advertisement.sender(), e);
1416 private void notifyDelegateIfNotNull(DeviceEvent event) {
1417 if (event != null) {
1418 notifyDelegate(event);
1422 private final class SendAdvertisementTask implements Runnable {
1426 if (Thread.currentThread().isInterrupted()) {
1427 log.debug("Interrupted, quitting");
1432 final NodeId self = clusterService.getLocalNode().id();
1433 Set<ControllerNode> nodes = clusterService.getNodes();
1435 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1436 .transform(toNodeId())
1439 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1440 log.trace("No other peers in the cluster.");
1446 int idx = RandomUtils.nextInt(0, nodeIds.size());
1447 peer = nodeIds.get(idx);
1448 } while (peer.equals(self));
1450 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1452 if (Thread.currentThread().isInterrupted()) {
1453 log.debug("Interrupted, quitting");
1458 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1459 } catch (IOException e) {
1460 log.debug("Failed to send anti-entropy advertisement to {}", peer);
1463 } catch (Exception e) {
1464 // catch all Exception to avoid Scheduled task being suppressed.
1465 log.error("Exception thrown while sending advertisement", e);
1470 private final class InternalDeviceEventListener
1471 implements ClusterMessageHandler {
1473 public void handle(ClusterMessage message) {
1474 log.debug("Received device update event from peer: {}", message.sender());
1475 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1477 ProviderId providerId = event.providerId();
1478 DeviceId deviceId = event.deviceId();
1479 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1482 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1483 } catch (Exception e) {
1484 log.warn("Exception thrown handling device update", e);
1489 private final class InternalDeviceOfflineEventListener
1490 implements ClusterMessageHandler {
1492 public void handle(ClusterMessage message) {
1493 log.debug("Received device offline event from peer: {}", message.sender());
1494 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1496 DeviceId deviceId = event.deviceId();
1497 Timestamp timestamp = event.timestamp();
1500 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1501 } catch (Exception e) {
1502 log.warn("Exception thrown handling device offline", e);
1507 private final class InternalRemoveRequestListener
1508 implements ClusterMessageHandler {
1510 public void handle(ClusterMessage message) {
1511 log.debug("Received device remove request from peer: {}", message.sender());
1512 DeviceId did = SERIALIZER.decode(message.payload());
1516 } catch (Exception e) {
1517 log.warn("Exception thrown handling device remove", e);
1522 private final class InternalDeviceRemovedEventListener
1523 implements ClusterMessageHandler {
1525 public void handle(ClusterMessage message) {
1526 log.debug("Received device removed event from peer: {}", message.sender());
1527 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1529 DeviceId deviceId = event.deviceId();
1530 Timestamp timestamp = event.timestamp();
1533 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1534 } catch (Exception e) {
1535 log.warn("Exception thrown handling device removed", e);
1540 private final class InternalPortEventListener
1541 implements ClusterMessageHandler {
1543 public void handle(ClusterMessage message) {
1545 log.debug("Received port update event from peer: {}", message.sender());
1546 InternalPortEvent event = SERIALIZER.decode(message.payload());
1548 ProviderId providerId = event.providerId();
1549 DeviceId deviceId = event.deviceId();
1550 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1552 if (getDevice(deviceId) == null) {
1553 log.debug("{} not found on this node yet, ignoring.", deviceId);
1554 // Note: dropped information will be recovered by anti-entropy
1559 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1560 } catch (Exception e) {
1561 log.warn("Exception thrown handling port update", e);
1566 private final class InternalPortStatusEventListener
1567 implements ClusterMessageHandler {
1569 public void handle(ClusterMessage message) {
1571 log.debug("Received port status update event from peer: {}", message.sender());
1572 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1574 ProviderId providerId = event.providerId();
1575 DeviceId deviceId = event.deviceId();
1576 Timestamped<PortDescription> portDescription = event.portDescription();
1578 if (getDevice(deviceId) == null) {
1579 log.debug("{} not found on this node yet, ignoring.", deviceId);
1580 // Note: dropped information will be recovered by anti-entropy
1585 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1586 } catch (Exception e) {
1587 log.warn("Exception thrown handling port update", e);
1592 private final class InternalDeviceAdvertisementListener
1593 implements ClusterMessageHandler {
1595 public void handle(ClusterMessage message) {
1596 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1597 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1599 handleAdvertisement(advertisement);
1600 } catch (Exception e) {
1601 log.warn("Exception thrown handling Device advertisements.", e);
1606 private final class DeviceInjectedEventListener
1607 implements ClusterMessageHandler {
1609 public void handle(ClusterMessage message) {
1610 log.debug("Received injected device event from peer: {}", message.sender());
1611 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1613 ProviderId providerId = event.providerId();
1614 DeviceId deviceId = event.deviceId();
1615 DeviceDescription deviceDescription = event.deviceDescription();
1616 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1617 // workaround for ONOS-1208
1618 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1623 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1624 } catch (Exception e) {
1625 log.warn("Exception thrown handling device injected event.", e);
1630 private final class PortInjectedEventListener
1631 implements ClusterMessageHandler {
1633 public void handle(ClusterMessage message) {
1634 log.debug("Received injected port event from peer: {}", message.sender());
1635 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1637 ProviderId providerId = event.providerId();
1638 DeviceId deviceId = event.deviceId();
1639 List<PortDescription> portDescriptions = event.portDescriptions();
1640 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1641 // workaround for ONOS-1208
1642 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1647 updatePorts(providerId, deviceId, portDescriptions);
1648 } catch (Exception e) {
1649 log.warn("Exception thrown handling port injected event.", e);
1654 private class InternalPortStatsListener
1655 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1657 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1658 if (event.type() == PUT) {
1659 Device device = devices.get(event.key());
1660 if (device != null) {
1661 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));