2 * Copyright 2014-2015 Open Networking Laboratory
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
16 package org.onosproject.store.device.impl;
18 import com.google.common.collect.FluentIterable;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
23 import org.apache.commons.lang3.RandomUtils;
24 import org.apache.felix.scr.annotations.Activate;
25 import org.apache.felix.scr.annotations.Component;
26 import org.apache.felix.scr.annotations.Deactivate;
27 import org.apache.felix.scr.annotations.Reference;
28 import org.apache.felix.scr.annotations.ReferenceCardinality;
29 import org.apache.felix.scr.annotations.Service;
30 import org.onlab.packet.ChassisId;
31 import org.onlab.util.KryoNamespace;
32 import org.onlab.util.NewConcurrentHashMap;
33 import org.onosproject.cluster.ClusterService;
34 import org.onosproject.cluster.ControllerNode;
35 import org.onosproject.cluster.NodeId;
36 import org.onosproject.mastership.MastershipService;
37 import org.onosproject.mastership.MastershipTerm;
38 import org.onosproject.mastership.MastershipTermService;
39 import org.onosproject.net.Annotations;
40 import org.onosproject.net.AnnotationsUtil;
41 import org.onosproject.net.DefaultAnnotations;
42 import org.onosproject.net.DefaultDevice;
43 import org.onosproject.net.DefaultPort;
44 import org.onosproject.net.Device;
45 import org.onosproject.net.Device.Type;
46 import org.onosproject.net.DeviceId;
47 import org.onosproject.net.MastershipRole;
48 import org.onosproject.net.OchPort;
49 import org.onosproject.net.OduCltPort;
50 import org.onosproject.net.OmsPort;
51 import org.onosproject.net.Port;
52 import org.onosproject.net.PortNumber;
53 import org.onosproject.net.device.DefaultPortStatistics;
54 import org.onosproject.net.device.DeviceClockService;
55 import org.onosproject.net.device.DeviceDescription;
56 import org.onosproject.net.device.DeviceEvent;
57 import org.onosproject.net.device.DeviceStore;
58 import org.onosproject.net.device.DeviceStoreDelegate;
59 import org.onosproject.net.device.OchPortDescription;
60 import org.onosproject.net.device.OduCltPortDescription;
61 import org.onosproject.net.device.OmsPortDescription;
62 import org.onosproject.net.device.PortDescription;
63 import org.onosproject.net.device.PortStatistics;
64 import org.onosproject.net.provider.ProviderId;
65 import org.onosproject.store.AbstractStore;
66 import org.onosproject.store.Timestamp;
67 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
68 import org.onosproject.store.cluster.messaging.ClusterMessage;
69 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
70 import org.onosproject.store.cluster.messaging.MessageSubject;
71 import org.onosproject.store.impl.Timestamped;
72 import org.onosproject.store.serializers.KryoNamespaces;
73 import org.onosproject.store.serializers.KryoSerializer;
74 import org.onosproject.store.serializers.custom.DistributedStoreSerializers;
75 import org.onosproject.store.service.EventuallyConsistentMap;
76 import org.onosproject.store.service.EventuallyConsistentMapEvent;
77 import org.onosproject.store.service.EventuallyConsistentMapListener;
78 import org.onosproject.store.service.MultiValuedTimestamp;
79 import org.onosproject.store.service.StorageService;
80 import org.onosproject.store.service.WallClockTimestamp;
81 import org.slf4j.Logger;
83 import java.io.IOException;
84 import java.util.ArrayList;
85 import java.util.Collection;
86 import java.util.Collections;
87 import java.util.HashMap;
88 import java.util.HashSet;
89 import java.util.Iterator;
90 import java.util.List;
92 import java.util.Map.Entry;
93 import java.util.Objects;
95 import java.util.concurrent.ConcurrentMap;
96 import java.util.concurrent.ExecutorService;
97 import java.util.concurrent.Executors;
98 import java.util.concurrent.ScheduledExecutorService;
99 import java.util.concurrent.TimeUnit;
101 import static com.google.common.base.Preconditions.checkArgument;
102 import static com.google.common.base.Predicates.notNull;
103 import static com.google.common.base.Verify.verify;
104 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
105 import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
106 import static org.onlab.util.Tools.groupedThreads;
107 import static org.onlab.util.Tools.minPriority;
108 import static org.onosproject.cluster.ControllerNodeToNodeId.toNodeId;
109 import static org.onosproject.net.DefaultAnnotations.merge;
110 import static org.onosproject.net.device.DeviceEvent.Type.*;
111 import static org.onosproject.store.device.impl.GossipDeviceStoreMessageSubjects.*;
112 import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
113 import static org.slf4j.LoggerFactory.getLogger;
116 * Manages inventory of infrastructure devices using gossip protocol to distribute
119 @Component(immediate = true)
121 public class GossipDeviceStore
122 extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
123 implements DeviceStore {
125 private final Logger log = getLogger(getClass());
127 private static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
128 // Timeout in milliseconds to process device or ports on remote master node
129 private static final int REMOTE_MASTER_TIMEOUT = 1000;
131 // innerMap is used to lock a Device, thus instance should never be replaced.
132 // collection of Description given from various providers
133 private final ConcurrentMap<DeviceId, Map<ProviderId, DeviceDescriptions>>
134 deviceDescs = Maps.newConcurrentMap();
136 // cache of Device and Ports generated by compositing descriptions from providers
137 private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
138 private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
140 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortStats;
141 private EventuallyConsistentMap<DeviceId, Map<PortNumber, PortStatistics>> devicePortDeltaStats;
142 private final EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>>
143 portStatsListener = new InternalPortStatsListener();
145 // to be updated under Device lock
146 private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
147 private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
149 // available(=UP) devices
150 private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
152 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
153 protected DeviceClockService deviceClockService;
155 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
156 protected StorageService storageService;
158 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
159 protected ClusterCommunicationService clusterCommunicator;
161 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
162 protected ClusterService clusterService;
164 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
165 protected MastershipService mastershipService;
167 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
168 protected MastershipTermService termService;
171 protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
173 protected void setupKryoPool() {
174 serializerPool = KryoNamespace.newBuilder()
175 .register(DistributedStoreSerializers.STORE_COMMON)
176 .nextId(DistributedStoreSerializers.STORE_CUSTOM_BEGIN)
177 .register(new InternalDeviceEventSerializer(), InternalDeviceEvent.class)
178 .register(new InternalDeviceOfflineEventSerializer(), InternalDeviceOfflineEvent.class)
179 .register(InternalDeviceRemovedEvent.class)
180 .register(new InternalPortEventSerializer(), InternalPortEvent.class)
181 .register(new InternalPortStatusEventSerializer(), InternalPortStatusEvent.class)
182 .register(DeviceAntiEntropyAdvertisement.class)
183 .register(DeviceFragmentId.class)
184 .register(PortFragmentId.class)
185 .register(DeviceInjectedEvent.class)
186 .register(PortInjectedEvent.class)
191 private ExecutorService executor;
193 private ScheduledExecutorService backgroundExecutor;
195 // TODO make these anti-entropy parameters configurable
196 private long initialDelaySec = 5;
197 private long periodSec = 5;
200 public void activate() {
201 executor = Executors.newCachedThreadPool(groupedThreads("onos/device", "fg-%d"));
204 newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d")));
206 clusterCommunicator.addSubscriber(
207 GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener(), executor);
208 clusterCommunicator.addSubscriber(
209 GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
210 new InternalDeviceOfflineEventListener(),
212 clusterCommunicator.addSubscriber(DEVICE_REMOVE_REQ,
213 new InternalRemoveRequestListener(),
215 clusterCommunicator.addSubscriber(
216 GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener(), executor);
217 clusterCommunicator.addSubscriber(
218 GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener(), executor);
219 clusterCommunicator.addSubscriber(
220 GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener(), executor);
221 clusterCommunicator.addSubscriber(
222 GossipDeviceStoreMessageSubjects.DEVICE_ADVERTISE,
223 new InternalDeviceAdvertisementListener(),
225 clusterCommunicator.addSubscriber(
226 GossipDeviceStoreMessageSubjects.DEVICE_INJECTED, new DeviceInjectedEventListener(), executor);
227 clusterCommunicator.addSubscriber(
228 GossipDeviceStoreMessageSubjects.PORT_INJECTED, new PortInjectedEventListener(), executor);
230 // start anti-entropy thread
231 backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
232 initialDelaySec, periodSec, TimeUnit.SECONDS);
234 // Create a distributed map for port stats.
235 KryoNamespace.Builder deviceDataSerializer = KryoNamespace.newBuilder()
236 .register(KryoNamespaces.API)
237 .register(DefaultPortStatistics.class)
238 .register(DeviceId.class)
239 .register(MultiValuedTimestamp.class)
240 .register(WallClockTimestamp.class);
242 devicePortStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>eventuallyConsistentMapBuilder()
243 .withName("port-stats")
244 .withSerializer(deviceDataSerializer)
245 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
246 .withTimestampProvider((k, v) -> new WallClockTimestamp())
247 .withTombstonesDisabled()
249 devicePortDeltaStats = storageService.<DeviceId, Map<PortNumber, PortStatistics>>
250 eventuallyConsistentMapBuilder()
251 .withName("port-stats-delta")
252 .withSerializer(deviceDataSerializer)
253 .withAntiEntropyPeriod(5, TimeUnit.SECONDS)
254 .withTimestampProvider((k, v) -> new WallClockTimestamp())
255 .withTombstonesDisabled()
257 devicePortStats.addListener(portStatsListener);
262 public void deactivate() {
263 devicePortStats.destroy();
264 devicePortDeltaStats.destroy();
265 executor.shutdownNow();
267 backgroundExecutor.shutdownNow();
269 if (!backgroundExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
270 log.error("Timeout during executor shutdown");
272 } catch (InterruptedException e) {
273 log.error("Error during executor shutdown", e);
279 availableDevices.clear();
284 public int getDeviceCount() {
285 return devices.size();
289 public Iterable<Device> getDevices() {
290 return Collections.unmodifiableCollection(devices.values());
294 public Iterable<Device> getAvailableDevices() {
295 return FluentIterable.from(getDevices())
296 .filter(input -> isAvailable(input.id()));
300 public Device getDevice(DeviceId deviceId) {
301 return devices.get(deviceId);
305 public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
307 DeviceDescription deviceDescription) {
308 NodeId localNode = clusterService.getLocalNode().id();
309 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
311 // Process device update only if we're the master,
312 // otherwise signal the actual master.
313 DeviceEvent deviceEvent = null;
314 if (localNode.equals(deviceNode)) {
316 final Timestamp newTimestamp = deviceClockService.getTimestamp(deviceId);
317 final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
318 final Timestamped<DeviceDescription> mergedDesc;
319 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
321 synchronized (device) {
322 deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
323 mergedDesc = device.get(providerId).getDeviceDesc();
326 if (deviceEvent != null) {
327 log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
328 providerId, deviceId);
329 notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
333 // FIXME Temporary hack for NPE (ONOS-1171).
334 // Proper fix is to implement forwarding to master on ConfigProvider
336 if (deviceNode == null) {
342 DeviceInjectedEvent deviceInjectedEvent = new DeviceInjectedEvent(
343 providerId, deviceId, deviceDescription);
345 // TODO check unicast return value
346 clusterCommunicator.unicast(deviceInjectedEvent, DEVICE_INJECTED, SERIALIZER::encode, deviceNode);
348 log.warn("Failed to process injected device id: {} desc: {} " +
349 "(cluster messaging failed: {})",
350 deviceId, deviceDescription, e);
357 private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
359 Timestamped<DeviceDescription> deltaDesc) {
361 // Collection of DeviceDescriptions for a Device
362 Map<ProviderId, DeviceDescriptions> device
363 = getOrCreateDeviceDescriptionsMap(deviceId);
365 synchronized (device) {
366 // locking per device
368 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
369 log.debug("Ignoring outdated event: {}", deltaDesc);
373 DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);
375 final Device oldDevice = devices.get(deviceId);
376 final Device newDevice;
378 if (deltaDesc == descs.getDeviceDesc() ||
379 deltaDesc.isNewer(descs.getDeviceDesc())) {
380 // on new device or valid update
381 descs.putDeviceDesc(deltaDesc);
382 newDevice = composeDevice(deviceId, device);
384 // outdated event, ignored.
387 if (oldDevice == null) {
389 return createDevice(providerId, newDevice, deltaDesc.timestamp());
391 // UPDATE or ignore (no change or stale)
392 return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
397 // Creates the device and returns the appropriate event if necessary.
398 // Guarded by deviceDescs value (=Device lock)
399 private DeviceEvent createDevice(ProviderId providerId,
400 Device newDevice, Timestamp timestamp) {
402 // update composed device cache
403 Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
404 verify(oldDevice == null,
405 "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
406 providerId, oldDevice, newDevice);
408 if (!providerId.isAncillary()) {
409 markOnline(newDevice.id(), timestamp);
412 return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
415 // Updates the device and returns the appropriate event if necessary.
416 // Guarded by deviceDescs value (=Device lock)
417 private DeviceEvent updateDevice(ProviderId providerId,
419 Device newDevice, Timestamp newTimestamp) {
420 // We allow only certain attributes to trigger update
421 boolean propertiesChanged =
422 !Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
423 !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
424 !Objects.equals(oldDevice.providerId(), newDevice.providerId());
425 boolean annotationsChanged =
426 !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations());
428 // Primary providers can respond to all changes, but ancillary ones
429 // should respond only to annotation changes.
430 DeviceEvent event = null;
431 if ((providerId.isAncillary() && annotationsChanged) ||
432 (!providerId.isAncillary() && (propertiesChanged || annotationsChanged))) {
433 boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
436 "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
437 providerId, oldDevice, devices.get(newDevice.id())
441 event = new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
444 if (!providerId.isAncillary()) {
445 boolean wasOnline = availableDevices.contains(newDevice.id());
446 markOnline(newDevice.id(), newTimestamp);
448 notifyDelegateIfNotNull(new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null));
455 public DeviceEvent markOffline(DeviceId deviceId) {
456 final Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
457 final DeviceEvent event = markOfflineInternal(deviceId, timestamp);
459 log.debug("Notifying peers of a device offline topology event for deviceId: {} {}",
460 deviceId, timestamp);
461 notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
466 private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
468 Map<ProviderId, DeviceDescriptions> providerDescs
469 = getOrCreateDeviceDescriptionsMap(deviceId);
472 synchronized (providerDescs) {
474 // accept off-line if given timestamp is newer than
475 // the latest Timestamp from Primary provider
476 DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
477 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
478 if (timestamp.compareTo(lastTimestamp) <= 0) {
479 // outdated event ignore
483 offline.put(deviceId, timestamp);
485 Device device = devices.get(deviceId);
486 if (device == null) {
489 boolean removed = availableDevices.remove(deviceId);
491 return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
498 * Marks the device as available if the given timestamp is not outdated,
499 * compared to the time the device has been marked offline.
501 * @param deviceId identifier of the device
502 * @param timestamp of the event triggering this change.
503 * @return true if availability change request was accepted and changed the state
505 // Guarded by deviceDescs value (=Device lock)
506 private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
507 // accept on-line if given timestamp is newer than
508 // the latest offline request Timestamp
509 Timestamp offlineTimestamp = offline.get(deviceId);
510 if (offlineTimestamp == null ||
511 offlineTimestamp.compareTo(timestamp) < 0) {
513 offline.remove(deviceId);
514 return availableDevices.add(deviceId);
520 public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
522 List<PortDescription> portDescriptions) {
524 NodeId localNode = clusterService.getLocalNode().id();
525 // TODO: It might be negligible, but this will have negative impact to topology discovery performance,
526 // since it will trigger distributed store read.
527 // Also, it'll probably be better if side-way communication happened on ConfigurationProvider, etc.
528 // outside Device subsystem. so that we don't have to modify both Device and Link stores.
529 // If we don't care much about topology performance, then it might be OK.
530 NodeId deviceNode = mastershipService.getMasterFor(deviceId);
532 // Process port update only if we're the master of the device,
533 // otherwise signal the actual master.
534 List<DeviceEvent> deviceEvents = null;
535 if (localNode.equals(deviceNode)) {
537 final Timestamp newTimestamp;
539 newTimestamp = deviceClockService.getTimestamp(deviceId);
540 } catch (IllegalStateException e) {
541 log.info("Timestamp was not available for device {}", deviceId);
542 log.debug(" discarding {}", portDescriptions);
543 // Failed to generate timestamp.
545 // Possible situation:
546 // Device connected and became master for short period of time,
547 // but lost mastership before this instance had the chance to
548 // retrieve term information.
550 // Information dropped here is expected to be recoverable by
551 // device probing after mastership change
553 return Collections.emptyList();
555 log.debug("timestamp for {} {}", deviceId, newTimestamp);
557 final Timestamped<List<PortDescription>> timestampedInput
558 = new Timestamped<>(portDescriptions, newTimestamp);
559 final Timestamped<List<PortDescription>> merged;
561 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
563 synchronized (device) {
564 deviceEvents = updatePortsInternal(providerId, deviceId, timestampedInput);
565 final DeviceDescriptions descs = device.get(providerId);
566 List<PortDescription> mergedList =
567 FluentIterable.from(portDescriptions)
569 // lookup merged port description
570 descs.getPortDesc(input.portNumber()).value()
572 merged = new Timestamped<>(mergedList, newTimestamp);
575 if (!deviceEvents.isEmpty()) {
576 log.debug("Notifying peers of a ports update topology event for providerId: {} and deviceId: {}",
577 providerId, deviceId);
578 notifyPeers(new InternalPortEvent(providerId, deviceId, merged));
582 // FIXME Temporary hack for NPE (ONOS-1171).
583 // Proper fix is to implement forwarding to master on ConfigProvider
585 if (deviceNode == null) {
587 return Collections.emptyList();
590 PortInjectedEvent portInjectedEvent = new PortInjectedEvent(providerId, deviceId, portDescriptions);
592 //TODO check unicast return value
593 clusterCommunicator.unicast(portInjectedEvent, PORT_INJECTED, SERIALIZER::encode, deviceNode);
595 log.warn("Failed to process injected ports of device id: {} " +
596 "(cluster messaging failed: {})",
601 return deviceEvents == null ? Collections.emptyList() : deviceEvents;
604 private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
606 Timestamped<List<PortDescription>> portDescriptions) {
608 Device device = devices.get(deviceId);
609 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
611 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
612 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
614 List<DeviceEvent> events = new ArrayList<>();
615 synchronized (descsMap) {
617 if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
618 log.debug("Ignoring outdated events: {}", portDescriptions);
619 return Collections.emptyList();
622 DeviceDescriptions descs = descsMap.get(providerId);
623 // every provider must provide DeviceDescription.
624 checkArgument(descs != null,
625 "Device description for Device ID %s from Provider %s was not found",
626 deviceId, providerId);
628 Map<PortNumber, Port> ports = getPortMap(deviceId);
630 final Timestamp newTimestamp = portDescriptions.timestamp();
633 Set<PortNumber> processed = new HashSet<>();
634 for (PortDescription portDescription : portDescriptions.value()) {
635 final PortNumber number = portDescription.portNumber();
636 processed.add(number);
638 final Port oldPort = ports.get(number);
642 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
643 if (existingPortDesc == null ||
644 newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
645 // on new port or valid update
646 // update description
647 descs.putPortDesc(new Timestamped<>(portDescription,
648 portDescriptions.timestamp()));
649 newPort = composePort(device, number, descsMap);
651 // outdated event, ignored.
655 events.add(oldPort == null ?
656 createPort(device, newPort, ports) :
657 updatePort(device, oldPort, newPort, ports));
660 events.addAll(pruneOldPorts(device, ports, processed));
662 return FluentIterable.from(events).filter(notNull()).toList();
665 // Creates a new port based on the port description adds it to the map and
666 // Returns corresponding event.
667 // Guarded by deviceDescs value (=Device lock)
668 private DeviceEvent createPort(Device device, Port newPort,
669 Map<PortNumber, Port> ports) {
670 ports.put(newPort.number(), newPort);
671 return new DeviceEvent(PORT_ADDED, device, newPort);
674 // Checks if the specified port requires update and if so, it replaces the
675 // existing entry in the map and returns corresponding event.
676 // Guarded by deviceDescs value (=Device lock)
677 private DeviceEvent updatePort(Device device, Port oldPort,
679 Map<PortNumber, Port> ports) {
680 if (oldPort.isEnabled() != newPort.isEnabled() ||
681 oldPort.type() != newPort.type() ||
682 oldPort.portSpeed() != newPort.portSpeed() ||
683 !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
684 ports.put(oldPort.number(), newPort);
685 return new DeviceEvent(PORT_UPDATED, device, newPort);
690 // Prunes the specified list of ports based on which ports are in the
691 // processed list and returns list of corresponding events.
692 // Guarded by deviceDescs value (=Device lock)
693 private List<DeviceEvent> pruneOldPorts(Device device,
694 Map<PortNumber, Port> ports,
695 Set<PortNumber> processed) {
696 List<DeviceEvent> events = new ArrayList<>();
697 Iterator<Entry<PortNumber, Port>> iterator = ports.entrySet().iterator();
698 while (iterator.hasNext()) {
699 Entry<PortNumber, Port> e = iterator.next();
700 PortNumber portNumber = e.getKey();
701 if (!processed.contains(portNumber)) {
702 events.add(new DeviceEvent(PORT_REMOVED, device, e.getValue()));
709 // Gets the map of ports for the specified device; if one does not already
710 // exist, it creates and registers a new one.
711 private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
712 return createIfAbsentUnchecked(devicePorts, deviceId,
713 NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
716 private Map<ProviderId, DeviceDescriptions> getOrCreateDeviceDescriptionsMap(
718 Map<ProviderId, DeviceDescriptions> r;
719 r = deviceDescs.get(deviceId);
722 final Map<ProviderId, DeviceDescriptions> concurrentlyAdded;
723 concurrentlyAdded = deviceDescs.putIfAbsent(deviceId, r);
724 if (concurrentlyAdded != null) {
725 r = concurrentlyAdded;
731 // Guarded by deviceDescs value (=Device lock)
732 private DeviceDescriptions getOrCreateProviderDeviceDescriptions(
733 Map<ProviderId, DeviceDescriptions> device,
734 ProviderId providerId, Timestamped<DeviceDescription> deltaDesc) {
735 synchronized (device) {
736 DeviceDescriptions r = device.get(providerId);
738 r = new DeviceDescriptions(deltaDesc);
739 device.put(providerId, r);
746 public synchronized DeviceEvent updatePortStatus(ProviderId providerId,
748 PortDescription portDescription) {
749 final Timestamp newTimestamp;
751 newTimestamp = deviceClockService.getTimestamp(deviceId);
752 } catch (IllegalStateException e) {
753 log.info("Timestamp was not available for device {}", deviceId);
754 log.debug(" discarding {}", portDescription);
755 // Failed to generate timestamp. Ignoring.
756 // See updatePorts comment
759 final Timestamped<PortDescription> deltaDesc
760 = new Timestamped<>(portDescription, newTimestamp);
761 final DeviceEvent event;
762 final Timestamped<PortDescription> mergedDesc;
763 final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);
764 synchronized (device) {
765 event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
766 mergedDesc = device.get(providerId)
767 .getPortDesc(portDescription.portNumber());
770 log.debug("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
771 providerId, deviceId);
772 notifyPeers(new InternalPortStatusEvent(providerId, deviceId, mergedDesc));
777 private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
778 Timestamped<PortDescription> deltaDesc) {
779 Device device = devices.get(deviceId);
780 checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
782 Map<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
783 checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
785 synchronized (descsMap) {
787 if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
788 log.debug("Ignoring outdated event: {}", deltaDesc);
792 DeviceDescriptions descs = descsMap.get(providerId);
793 // assuming all providers must to give DeviceDescription
794 verify(descs != null,
795 "Device description for Device ID %s from Provider %s was not found",
796 deviceId, providerId);
798 ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
799 final PortNumber number = deltaDesc.value().portNumber();
800 final Port oldPort = ports.get(number);
803 final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
804 if (existingPortDesc == null ||
805 deltaDesc.isNewer(existingPortDesc)) {
806 // on new port or valid update
807 // update description
808 descs.putPortDesc(deltaDesc);
809 newPort = composePort(device, number, descsMap);
811 // same or outdated event, ignored.
812 log.trace("ignore same or outdated {} >= {}", existingPortDesc, deltaDesc);
816 if (oldPort == null) {
817 return createPort(device, newPort, ports);
819 return updatePort(device, oldPort, newPort, ports);
825 public List<Port> getPorts(DeviceId deviceId) {
826 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
828 return Collections.emptyList();
830 return ImmutableList.copyOf(ports.values());
834 public DeviceEvent updatePortStatistics(ProviderId providerId, DeviceId deviceId,
835 Collection<PortStatistics> newStatsCollection) {
837 Map<PortNumber, PortStatistics> prvStatsMap = devicePortStats.get(deviceId);
838 Map<PortNumber, PortStatistics> newStatsMap = Maps.newHashMap();
839 Map<PortNumber, PortStatistics> deltaStatsMap = Maps.newHashMap();
841 if (prvStatsMap != null) {
842 for (PortStatistics newStats : newStatsCollection) {
843 PortNumber port = PortNumber.portNumber(newStats.port());
844 PortStatistics prvStats = prvStatsMap.get(port);
845 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
846 PortStatistics deltaStats = builder.build();
847 if (prvStats != null) {
848 deltaStats = calcDeltaStats(deviceId, prvStats, newStats);
850 deltaStatsMap.put(port, deltaStats);
851 newStatsMap.put(port, newStats);
854 for (PortStatistics newStats : newStatsCollection) {
855 PortNumber port = PortNumber.portNumber(newStats.port());
856 newStatsMap.put(port, newStats);
859 devicePortDeltaStats.put(deviceId, deltaStatsMap);
860 devicePortStats.put(deviceId, newStatsMap);
861 // DeviceEvent returns null because of InternalPortStatsListener usage
866 * Calculate delta statistics by subtracting previous from new statistics.
868 * @param deviceId device identifier
869 * @param prvStats previous port statistics
870 * @param newStats new port statistics
871 * @return PortStatistics
873 public PortStatistics calcDeltaStats(DeviceId deviceId, PortStatistics prvStats, PortStatistics newStats) {
874 // calculate time difference
875 long deltaStatsSec, deltaStatsNano;
876 if (newStats.durationNano() < prvStats.durationNano()) {
877 deltaStatsNano = newStats.durationNano() - prvStats.durationNano() + TimeUnit.SECONDS.toNanos(1);
878 deltaStatsSec = newStats.durationSec() - prvStats.durationSec() - 1L;
880 deltaStatsNano = newStats.durationNano() - prvStats.durationNano();
881 deltaStatsSec = newStats.durationSec() - prvStats.durationSec();
883 DefaultPortStatistics.Builder builder = DefaultPortStatistics.builder();
884 DefaultPortStatistics deltaStats = builder.setDeviceId(deviceId)
885 .setPort(newStats.port())
886 .setPacketsReceived(newStats.packetsReceived() - prvStats.packetsReceived())
887 .setPacketsSent(newStats.packetsSent() - prvStats.packetsSent())
888 .setBytesReceived(newStats.bytesReceived() - prvStats.bytesReceived())
889 .setBytesSent(newStats.bytesSent() - prvStats.bytesSent())
890 .setPacketsRxDropped(newStats.packetsRxDropped() - prvStats.packetsRxDropped())
891 .setPacketsTxDropped(newStats.packetsTxDropped() - prvStats.packetsTxDropped())
892 .setPacketsRxErrors(newStats.packetsRxErrors() - prvStats.packetsRxErrors())
893 .setPacketsTxErrors(newStats.packetsTxErrors() - prvStats.packetsTxErrors())
894 .setDurationSec(deltaStatsSec)
895 .setDurationNano(deltaStatsNano)
901 public List<PortStatistics> getPortStatistics(DeviceId deviceId) {
902 Map<PortNumber, PortStatistics> portStats = devicePortStats.get(deviceId);
903 if (portStats == null) {
904 return Collections.emptyList();
906 return ImmutableList.copyOf(portStats.values());
910 public List<PortStatistics> getPortDeltaStatistics(DeviceId deviceId) {
911 Map<PortNumber, PortStatistics> portStats = devicePortDeltaStats.get(deviceId);
912 if (portStats == null) {
913 return Collections.emptyList();
915 return ImmutableList.copyOf(portStats.values());
919 public Port getPort(DeviceId deviceId, PortNumber portNumber) {
920 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
921 return ports == null ? null : ports.get(portNumber);
925 public boolean isAvailable(DeviceId deviceId) {
926 return availableDevices.contains(deviceId);
930 public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
931 final NodeId myId = clusterService.getLocalNode().id();
932 NodeId master = mastershipService.getMasterFor(deviceId);
934 // if there exist a master, forward
935 // if there is no master, try to become one and process
937 boolean relinquishAtEnd = false;
938 if (master == null) {
939 final MastershipRole myRole = mastershipService.getLocalRole(deviceId);
940 if (myRole != MastershipRole.NONE) {
941 relinquishAtEnd = true;
943 log.debug("Temporarily requesting role for {} to remove", deviceId);
944 mastershipService.requestRoleFor(deviceId);
945 MastershipTerm term = termService.getMastershipTerm(deviceId);
946 if (term != null && myId.equals(term.master())) {
951 if (!myId.equals(master)) {
952 log.debug("{} has control of {}, forwarding remove request",
955 // TODO check unicast return value
956 clusterCommunicator.unicast(deviceId, DEVICE_REMOVE_REQ, SERIALIZER::encode, master);
958 log.error("Failed to forward {} remove request to {}", deviceId, master, e);
961 // event will be triggered after master processes it.
967 Timestamp timestamp = deviceClockService.getTimestamp(deviceId);
968 DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
970 log.debug("Notifying peers of a device removed topology event for deviceId: {}",
972 notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
974 if (relinquishAtEnd) {
975 log.debug("Relinquishing temporary role acquired for {}", deviceId);
976 mastershipService.relinquishMastership(deviceId);
981 private DeviceEvent removeDeviceInternal(DeviceId deviceId,
982 Timestamp timestamp) {
984 Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
985 synchronized (descs) {
986 // accept removal request if given timestamp is newer than
987 // the latest Timestamp from Primary provider
988 DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
989 Timestamp lastTimestamp = primDescs.getLatestTimestamp();
990 if (timestamp.compareTo(lastTimestamp) <= 0) {
991 // outdated event ignore
994 removalRequest.put(deviceId, timestamp);
996 Device device = devices.remove(deviceId);
997 // should DEVICE_REMOVED carry removed ports?
998 Map<PortNumber, Port> ports = devicePorts.get(deviceId);
1002 markOfflineInternal(deviceId, timestamp);
1004 return device == null ? null :
1005 new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, device, null);
1010 * Checks if given timestamp is superseded by removal request
1011 * with more recent timestamp.
1013 * @param deviceId identifier of a device
1014 * @param timestampToCheck timestamp of an event to check
1015 * @return true if device is already removed
1017 private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
1018 Timestamp removalTimestamp = removalRequest.get(deviceId);
1019 if (removalTimestamp != null &&
1020 removalTimestamp.compareTo(timestampToCheck) >= 0) {
1021 // removalRequest is more recent
1028 * Returns a Device, merging description given from multiple Providers.
1030 * @param deviceId device identifier
1031 * @param providerDescs Collection of Descriptions from multiple providers
1032 * @return Device instance
1034 private Device composeDevice(DeviceId deviceId,
1035 Map<ProviderId, DeviceDescriptions> providerDescs) {
1037 checkArgument(!providerDescs.isEmpty(), "No device descriptions supplied");
1039 ProviderId primary = pickPrimaryPID(providerDescs);
1041 DeviceDescriptions desc = providerDescs.get(primary);
1043 final DeviceDescription base = desc.getDeviceDesc().value();
1044 Type type = base.type();
1045 String manufacturer = base.manufacturer();
1046 String hwVersion = base.hwVersion();
1047 String swVersion = base.swVersion();
1048 String serialNumber = base.serialNumber();
1049 ChassisId chassisId = base.chassisId();
1050 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1051 annotations = merge(annotations, base.annotations());
1053 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1054 if (e.getKey().equals(primary)) {
1057 // Note: should keep track of Description timestamp in the future
1058 // and only merge conflicting keys when timestamp is newer.
1059 // Currently assuming there will never be a key conflict between
1062 // annotation merging. not so efficient, should revisit later
1063 annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
1066 return new DefaultDevice(primary, deviceId, type, manufacturer,
1067 hwVersion, swVersion, serialNumber,
1068 chassisId, annotations);
1071 private Port buildTypedPort(Device device, PortNumber number, boolean isEnabled,
1072 PortDescription description, Annotations annotations) {
1073 switch (description.type()) {
1075 OmsPortDescription omsDesc = (OmsPortDescription) description;
1076 return new OmsPort(device, number, isEnabled, omsDesc.minFrequency(),
1077 omsDesc.maxFrequency(), omsDesc.grid(), annotations);
1079 OchPortDescription ochDesc = (OchPortDescription) description;
1080 return new OchPort(device, number, isEnabled, ochDesc.signalType(),
1081 ochDesc.isTunable(), ochDesc.lambda(), annotations);
1083 OduCltPortDescription oduDesc = (OduCltPortDescription) description;
1084 return new OduCltPort(device, number, isEnabled, oduDesc.signalType(), annotations);
1086 return new DefaultPort(device, number, isEnabled, description.type(),
1087 description.portSpeed(), annotations);
1092 * Returns a Port, merging description given from multiple Providers.
1094 * @param device device the port is on
1095 * @param number port number
1096 * @param descsMap Collection of Descriptions from multiple providers
1097 * @return Port instance
1099 private Port composePort(Device device, PortNumber number,
1100 Map<ProviderId, DeviceDescriptions> descsMap) {
1102 ProviderId primary = pickPrimaryPID(descsMap);
1103 DeviceDescriptions primDescs = descsMap.get(primary);
1104 // if no primary, assume not enabled
1105 boolean isEnabled = false;
1106 DefaultAnnotations annotations = DefaultAnnotations.builder().build();
1107 Timestamp newest = null;
1108 final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
1109 if (portDesc != null) {
1110 isEnabled = portDesc.value().isEnabled();
1111 annotations = merge(annotations, portDesc.value().annotations());
1112 newest = portDesc.timestamp();
1114 Port updated = null;
1115 for (Entry<ProviderId, DeviceDescriptions> e : descsMap.entrySet()) {
1116 if (e.getKey().equals(primary)) {
1119 // Note: should keep track of Description timestamp in the future
1120 // and only merge conflicting keys when timestamp is newer.
1121 // Currently assuming there will never be a key conflict between
1124 // annotation merging. not so efficient, should revisit later
1125 final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
1126 if (otherPortDesc != null) {
1127 if (newest != null && newest.isNewerThan(otherPortDesc.timestamp())) {
1130 annotations = merge(annotations, otherPortDesc.value().annotations());
1131 PortDescription other = otherPortDesc.value();
1132 updated = buildTypedPort(device, number, isEnabled, other, annotations);
1133 newest = otherPortDesc.timestamp();
1136 if (portDesc == null) {
1137 return updated == null ? new DefaultPort(device, number, false, annotations) : updated;
1139 PortDescription current = portDesc.value();
1140 return updated == null
1141 ? buildTypedPort(device, number, isEnabled, current, annotations)
1146 * @return primary ProviderID, or randomly chosen one if none exists
1148 private ProviderId pickPrimaryPID(
1149 Map<ProviderId, DeviceDescriptions> providerDescs) {
1150 ProviderId fallBackPrimary = null;
1151 for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
1152 if (!e.getKey().isAncillary()) {
1154 } else if (fallBackPrimary == null) {
1155 // pick randomly as a fallback in case there is no primary
1156 fallBackPrimary = e.getKey();
1159 return fallBackPrimary;
1162 private DeviceDescriptions getPrimaryDescriptions(
1163 Map<ProviderId, DeviceDescriptions> providerDescs) {
1164 ProviderId pid = pickPrimaryPID(providerDescs);
1165 return providerDescs.get(pid);
1168 private void unicastMessage(NodeId recipient, MessageSubject subject, Object event) throws IOException {
1169 clusterCommunicator.unicast(event, subject, SERIALIZER::encode, recipient);
1172 private void broadcastMessage(MessageSubject subject, Object event) {
1173 clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
1176 private void notifyPeers(InternalDeviceEvent event) {
1177 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1180 private void notifyPeers(InternalDeviceOfflineEvent event) {
1181 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1184 private void notifyPeers(InternalDeviceRemovedEvent event) {
1185 broadcastMessage(GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1188 private void notifyPeers(InternalPortEvent event) {
1189 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1192 private void notifyPeers(InternalPortStatusEvent event) {
1193 broadcastMessage(GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1196 private void notifyPeer(NodeId recipient, InternalDeviceEvent event) {
1198 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, event);
1199 } catch (IOException e) {
1200 log.error("Failed to send" + event + " to " + recipient, e);
1204 private void notifyPeer(NodeId recipient, InternalDeviceOfflineEvent event) {
1206 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, event);
1207 } catch (IOException e) {
1208 log.error("Failed to send" + event + " to " + recipient, e);
1212 private void notifyPeer(NodeId recipient, InternalDeviceRemovedEvent event) {
1214 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, event);
1215 } catch (IOException e) {
1216 log.error("Failed to send" + event + " to " + recipient, e);
1220 private void notifyPeer(NodeId recipient, InternalPortEvent event) {
1222 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_UPDATE, event);
1223 } catch (IOException e) {
1224 log.error("Failed to send" + event + " to " + recipient, e);
1228 private void notifyPeer(NodeId recipient, InternalPortStatusEvent event) {
1230 unicastMessage(recipient, GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, event);
1231 } catch (IOException e) {
1232 log.error("Failed to send" + event + " to " + recipient, e);
1236 private DeviceAntiEntropyAdvertisement createAdvertisement() {
1237 final NodeId self = clusterService.getLocalNode().id();
1239 final int numDevices = deviceDescs.size();
1240 Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
1241 final int portsPerDevice = 8; // random factor to minimize reallocation
1242 Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
1243 Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);
1245 deviceDescs.forEach((deviceId, devDescs) -> {
1247 // for each Device...
1248 synchronized (devDescs) {
1250 // send device offline timestamp
1251 Timestamp lOffline = this.offline.get(deviceId);
1252 if (lOffline != null) {
1253 adOffline.put(deviceId, lOffline);
1256 for (Entry<ProviderId, DeviceDescriptions>
1257 prov : devDescs.entrySet()) {
1259 // for each Provider Descriptions...
1260 final ProviderId provId = prov.getKey();
1261 final DeviceDescriptions descs = prov.getValue();
1263 adDevices.put(new DeviceFragmentId(deviceId, provId),
1264 descs.getDeviceDesc().timestamp());
1266 for (Entry<PortNumber, Timestamped<PortDescription>>
1267 portDesc : descs.getPortDescs().entrySet()) {
1269 final PortNumber number = portDesc.getKey();
1270 adPorts.put(new PortFragmentId(deviceId, provId, number),
1271 portDesc.getValue().timestamp());
1277 return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
1281 * Responds to anti-entropy advertisement message.
1283 * Notify sender about out-dated information using regular replication message.
1284 * Send back advertisement to sender if not in sync.
1286 * @param advertisement to respond to
1288 private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
1290 final NodeId sender = advertisement.sender();
1292 Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
1293 Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
1294 Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());
1296 // Fragments to request
1297 Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
1298 Collection<PortFragmentId> reqPorts = new ArrayList<>();
1300 for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
1301 final DeviceId deviceId = de.getKey();
1302 final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();
1304 synchronized (lDevice) {
1305 // latestTimestamp across provider
1306 // Note: can be null initially
1307 Timestamp localLatest = offline.get(deviceId);
1309 // handle device Ads
1310 for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
1311 final ProviderId provId = prov.getKey();
1312 final DeviceDescriptions lDeviceDescs = prov.getValue();
1314 final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);
1317 Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
1318 Timestamp advDevTimestamp = devAds.get(devFragId);
1320 if (advDevTimestamp == null || lProvDevice.isNewerThan(
1322 // remote does not have it or outdated, suggest
1323 notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
1324 } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
1325 // local is outdated, request
1326 reqDevices.add(devFragId);
1330 for (Entry<PortNumber, Timestamped<PortDescription>>
1331 pe : lDeviceDescs.getPortDescs().entrySet()) {
1333 final PortNumber num = pe.getKey();
1334 final Timestamped<PortDescription> lPort = pe.getValue();
1336 final PortFragmentId portFragId = new PortFragmentId(deviceId, provId, num);
1338 Timestamp advPortTimestamp = portAds.get(portFragId);
1339 if (advPortTimestamp == null || lPort.isNewerThan(
1340 advPortTimestamp)) {
1341 // remote does not have it or outdated, suggest
1342 notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
1343 } else if (!lPort.timestamp().equals(advPortTimestamp)) {
1344 // local is outdated, request
1345 log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
1346 reqPorts.add(portFragId);
1349 // remove port Ad already processed
1350 portAds.remove(portFragId);
1351 } // end local port loop
1353 // remove device Ad already processed
1354 devAds.remove(devFragId);
1356 // find latest and update
1357 final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
1358 if (localLatest == null ||
1359 providerLatest.compareTo(localLatest) > 0) {
1360 localLatest = providerLatest;
1362 } // end local provider loop
1364 // checking if remote timestamp is more recent.
1365 Timestamp rOffline = offlineAds.get(deviceId);
1366 if (rOffline != null &&
1367 rOffline.compareTo(localLatest) > 0) {
1368 // remote offline timestamp suggests that the
1369 // device is off-line
1370 markOfflineInternal(deviceId, rOffline);
1373 Timestamp lOffline = offline.get(deviceId);
1374 if (lOffline != null && rOffline == null) {
1375 // locally offline, but remote is online, suggest offline
1376 notifyPeer(sender, new InternalDeviceOfflineEvent(deviceId, lOffline));
1379 // remove device offline Ad already processed
1380 offlineAds.remove(deviceId);
1381 } // end local device loop
1384 // If there is any Ads left, request them
1385 log.trace("Ads left {}, {}", devAds, portAds);
1386 reqDevices.addAll(devAds.keySet());
1387 reqPorts.addAll(portAds.keySet());
1389 if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
1390 log.trace("Nothing to request to remote peer {}", sender);
1394 log.debug("Need to sync {} {}", reqDevices, reqPorts);
1396 // 2-way Anti-Entropy for now
1398 unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
1399 } catch (IOException e) {
1400 log.error("Failed to send response advertisement to " + sender, e);
1403 // Sketch of 3-way Anti-Entropy
1404 // DeviceAntiEntropyRequest request = new DeviceAntiEntropyRequest(self, reqDevices, reqPorts);
1405 // ClusterMessage message = new ClusterMessage(
1406 // clusterService.getLocalNode().id(),
1407 // GossipDeviceStoreMessageSubjects.DEVICE_REQUEST,
1408 // SERIALIZER.encode(request));
1411 // clusterCommunicator.unicast(message, advertisement.sender());
1412 // } catch (IOException e) {
1413 // log.error("Failed to send advertisement reply to "
1414 // + advertisement.sender(), e);
1418 private void notifyDelegateIfNotNull(DeviceEvent event) {
1419 if (event != null) {
1420 notifyDelegate(event);
1424 private final class SendAdvertisementTask implements Runnable {
1428 if (Thread.currentThread().isInterrupted()) {
1429 log.debug("Interrupted, quitting");
1434 final NodeId self = clusterService.getLocalNode().id();
1435 Set<ControllerNode> nodes = clusterService.getNodes();
1437 ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
1438 .transform(toNodeId())
1441 if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
1442 log.trace("No other peers in the cluster.");
1448 int idx = RandomUtils.nextInt(0, nodeIds.size());
1449 peer = nodeIds.get(idx);
1450 } while (peer.equals(self));
1452 DeviceAntiEntropyAdvertisement ad = createAdvertisement();
1454 if (Thread.currentThread().isInterrupted()) {
1455 log.debug("Interrupted, quitting");
1460 unicastMessage(peer, DEVICE_ADVERTISE, ad);
1461 } catch (IOException e) {
1462 log.debug("Failed to send anti-entropy advertisement to {}", peer);
1465 } catch (Exception e) {
1466 // catch all Exception to avoid Scheduled task being suppressed.
1467 log.error("Exception thrown while sending advertisement", e);
1472 private final class InternalDeviceEventListener
1473 implements ClusterMessageHandler {
1475 public void handle(ClusterMessage message) {
1476 log.debug("Received device update event from peer: {}", message.sender());
1477 InternalDeviceEvent event = SERIALIZER.decode(message.payload());
1479 ProviderId providerId = event.providerId();
1480 DeviceId deviceId = event.deviceId();
1481 Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
1484 notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription));
1485 } catch (Exception e) {
1486 log.warn("Exception thrown handling device update", e);
1491 private final class InternalDeviceOfflineEventListener
1492 implements ClusterMessageHandler {
1494 public void handle(ClusterMessage message) {
1495 log.debug("Received device offline event from peer: {}", message.sender());
1496 InternalDeviceOfflineEvent event = SERIALIZER.decode(message.payload());
1498 DeviceId deviceId = event.deviceId();
1499 Timestamp timestamp = event.timestamp();
1502 notifyDelegateIfNotNull(markOfflineInternal(deviceId, timestamp));
1503 } catch (Exception e) {
1504 log.warn("Exception thrown handling device offline", e);
1509 private final class InternalRemoveRequestListener
1510 implements ClusterMessageHandler {
1512 public void handle(ClusterMessage message) {
1513 log.debug("Received device remove request from peer: {}", message.sender());
1514 DeviceId did = SERIALIZER.decode(message.payload());
1518 } catch (Exception e) {
1519 log.warn("Exception thrown handling device remove", e);
1524 private final class InternalDeviceRemovedEventListener
1525 implements ClusterMessageHandler {
1527 public void handle(ClusterMessage message) {
1528 log.debug("Received device removed event from peer: {}", message.sender());
1529 InternalDeviceRemovedEvent event = SERIALIZER.decode(message.payload());
1531 DeviceId deviceId = event.deviceId();
1532 Timestamp timestamp = event.timestamp();
1535 notifyDelegateIfNotNull(removeDeviceInternal(deviceId, timestamp));
1536 } catch (Exception e) {
1537 log.warn("Exception thrown handling device removed", e);
1542 private final class InternalPortEventListener
1543 implements ClusterMessageHandler {
1545 public void handle(ClusterMessage message) {
1547 log.debug("Received port update event from peer: {}", message.sender());
1548 InternalPortEvent event = SERIALIZER.decode(message.payload());
1550 ProviderId providerId = event.providerId();
1551 DeviceId deviceId = event.deviceId();
1552 Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
1554 if (getDevice(deviceId) == null) {
1555 log.debug("{} not found on this node yet, ignoring.", deviceId);
1556 // Note: dropped information will be recovered by anti-entropy
1561 notifyDelegate(updatePortsInternal(providerId, deviceId, portDescriptions));
1562 } catch (Exception e) {
1563 log.warn("Exception thrown handling port update", e);
1568 private final class InternalPortStatusEventListener
1569 implements ClusterMessageHandler {
1571 public void handle(ClusterMessage message) {
1573 log.debug("Received port status update event from peer: {}", message.sender());
1574 InternalPortStatusEvent event = SERIALIZER.decode(message.payload());
1576 ProviderId providerId = event.providerId();
1577 DeviceId deviceId = event.deviceId();
1578 Timestamped<PortDescription> portDescription = event.portDescription();
1580 if (getDevice(deviceId) == null) {
1581 log.debug("{} not found on this node yet, ignoring.", deviceId);
1582 // Note: dropped information will be recovered by anti-entropy
1587 notifyDelegateIfNotNull(updatePortStatusInternal(providerId, deviceId, portDescription));
1588 } catch (Exception e) {
1589 log.warn("Exception thrown handling port update", e);
1594 private final class InternalDeviceAdvertisementListener
1595 implements ClusterMessageHandler {
1597 public void handle(ClusterMessage message) {
1598 log.trace("Received Device Anti-Entropy advertisement from peer: {}", message.sender());
1599 DeviceAntiEntropyAdvertisement advertisement = SERIALIZER.decode(message.payload());
1601 handleAdvertisement(advertisement);
1602 } catch (Exception e) {
1603 log.warn("Exception thrown handling Device advertisements.", e);
1608 private final class DeviceInjectedEventListener
1609 implements ClusterMessageHandler {
1611 public void handle(ClusterMessage message) {
1612 log.debug("Received injected device event from peer: {}", message.sender());
1613 DeviceInjectedEvent event = SERIALIZER.decode(message.payload());
1615 ProviderId providerId = event.providerId();
1616 DeviceId deviceId = event.deviceId();
1617 DeviceDescription deviceDescription = event.deviceDescription();
1618 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1619 // workaround for ONOS-1208
1620 log.warn("Not ready to accept update. Dropping {}", deviceDescription);
1625 createOrUpdateDevice(providerId, deviceId, deviceDescription);
1626 } catch (Exception e) {
1627 log.warn("Exception thrown handling device injected event.", e);
1632 private final class PortInjectedEventListener
1633 implements ClusterMessageHandler {
1635 public void handle(ClusterMessage message) {
1636 log.debug("Received injected port event from peer: {}", message.sender());
1637 PortInjectedEvent event = SERIALIZER.decode(message.payload());
1639 ProviderId providerId = event.providerId();
1640 DeviceId deviceId = event.deviceId();
1641 List<PortDescription> portDescriptions = event.portDescriptions();
1642 if (!deviceClockService.isTimestampAvailable(deviceId)) {
1643 // workaround for ONOS-1208
1644 log.warn("Not ready to accept update. Dropping {}", portDescriptions);
1649 updatePorts(providerId, deviceId, portDescriptions);
1650 } catch (Exception e) {
1651 log.warn("Exception thrown handling port injected event.", e);
1656 private class InternalPortStatsListener
1657 implements EventuallyConsistentMapListener<DeviceId, Map<PortNumber, PortStatistics>> {
1659 public void event(EventuallyConsistentMapEvent<DeviceId, Map<PortNumber, PortStatistics>> event) {
1660 if (event.type() == PUT) {
1661 Device device = devices.get(event.key());
1662 if (device != null) {
1663 delegate.notify(new DeviceEvent(PORT_STATS_UPDATED, device));